Skip to content

Instantly share code, notes, and snippets.

@umer0586
Created October 16, 2024 17:10
Show Gist options
  • Save umer0586/a5b1247b1999848fe16dda340335dfe6 to your computer and use it in GitHub Desktop.
Save umer0586/a5b1247b1999848fe16dda340335dfe6 to your computer and use it in GitHub Desktop.
import json
import bpy
import signal
import socket
import threading
# Global variable to store the latest quaternion data
latest_quaternion_data = None
server_running = False
def createPhone():
# Check if an object named "phone" already exists
if "phone" in bpy.data.objects:
# Remove the object named "phone"
obj = bpy.data.objects["phone"]
bpy.data.objects.remove(obj, do_unlink=True)
# Create a new cube
bpy.ops.mesh.primitive_cube_add(size=1)
# Get reference to the newly created cube
phone = bpy.context.object
# Rename the cube to "phone"
phone.name = "phone"
# Set the scale of the cube
phone.scale = (2, 4, 0.2)
phone.location = (0,0,0)
phone.rotation_mode = 'QUATERNION'
# Deselect the phone object to hide transform/rotation handles
phone.select_set(False)
bpy.context.view_layer.objects.active = None
return phone
# Call the function to create the phone object
phone = createPhone()
class UDPServer:
def __init__(self, address, buffer_size=1024):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.address = address
self.buffer_size = buffer_size
def setDataCallBack(self, callback):
self.callback = callback
def start(self, daemon=False):
self.thread = threading.Thread(target=self.__listen__)
self.thread.daemon = daemon # Daemon thread, so it terminates with the main program
self.thread.start()
def stop(self):
self.sock.close() # Close the socket to unblock recvfrom
def __listen__(self):
self.sock.bind(self.address)
while True:
try:
data, address = self.sock.recvfrom(self.buffer_size)
if self.callback != None:
self.callback(data)
except socket.error:
break # Break if the socket is closed
# Function to update the cube's rotation in the main thread using quaternion
def update_rotation():
global latest_quaternion_data
if latest_quaternion_data:
x, y, z, w = latest_quaternion_data
global phone
# Apply quaternion rotation to the cube
phone.rotation_quaternion = (w, x, y, z)
# Update the scene
bpy.context.view_layer.update()
# Return a small time interval to keep calling this function
return 0.01
# The onData callback from the UDP server
def onData(data):
global latest_quaternion_data
jsonData = json.loads(data)
sensorType = jsonData["type"]
timestamp = jsonData["timestamp"]
values = jsonData["values"]
if sensorType == "android.sensor.rotation_vector":
# Extract the quaternion data (x, y, z, w)
if len(values) == 4:
x, y, z, w = values
else:
x, y, z = values[:3]
w = (1.0 - (x*x + y*y + z*z))**0.5 # Estimate w if it's not provided
# Update the global variable with the latest quaternion values
latest_quaternion_data = (x, y, z, w)
# Function to safely stop the server
def stop_server(*args):
global server_running
if server_running:
server.stop() # Stop the server (you might need to implement stop in your UDPServer)
server_running = False
print("UDPserver stopped")
bpy.app.timers.unregister(update_rotation)
# Initialize the server to listen on all network interfaces (0.0.0.0) and port 8080
server = UDPServer(address=("0.0.0.0", 8080))
server.setDataCallBack(onData)
server.start()
server_running = True
print("UDP server started : press CTRL+C to stop")
# Register the timer to update the rotation in the main thread
bpy.app.timers.register(update_rotation)
# Handle the Ctrl+C signal
signal.signal(signal.SIGINT, stop_server)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment