Skip to content

Instantly share code, notes, and snippets.

@sbeleidy
Last active January 28, 2025 18:20
Show Gist options
  • Save sbeleidy/31294c36660a6a8a70184b2829de8ffd to your computer and use it in GitHub Desktop.
Save sbeleidy/31294c36660a6a8a70184b2829de8ffd to your computer and use it in GitHub Desktop.
Misty Camera Stream Python

This gist is used for the Pigeon embodiment project to get a video stream from a Misty robot and present it to pigeons.

Tasks:

  • Get Misty camera feed
  • Post Misty camera feed in tkinter button
  • Pass tkinter button click X Y coords to Misty motion
import requests
import websockets
from tkinter import Tk
from tkinter import ttk
from PIL import ImageTk, Image
import io
import threading
import asyncio
class MistyHelper:
def __init__(self, ip: str) -> None:
self.ip = ip
self.stream = None
def get_camera(self):
"""Get Misty's camera setting
Returns:
str: True or False
"""
camera = requests.get(f"http://{self.ip}/api/services/camera")
return camera.json()["result"]
def get_av(self):
"""Get Misty's AV setting
Returns:
str: True or False
"""
av = requests.get(f"http://{self.ip}/api/services/avstreaming")
return av.json()["result"]
def start_stream(self, port: int):
"""Start a misty streaming at port
Args:
port (int): The port to use for Misty's rtsp stream
Returns:
requests.Response: The response from Misty starting the stream
"""
if self.get_camera() != "True":
# Enable streaming first
print("Enabling streaming...")
enablePath = f"http://{self.ip}/api/services/camera/enable"
enableStream = requests.post(enablePath)
print(f"Enabling: {enableStream.status_code} {enableStream.json()}")
startPath = f"http://{self.ip}/api/videostreaming/start"
startedStream = requests.post(
startPath,
params={
"Port": port,
"Rotation": 90,
"Width": 1280,
"Height": 960,
"Quality": 10,
},
)
print(f"Starting stream: {startedStream.status_code} {startedStream.json()}")
if startedStream.status_code == 200:
print("Set stream")
self.stream = f"ws://{self.ip}:{port}"
return startedStream
def stop_stream(self):
"""Stop Misty's stream
Returns:
requests.Response: The response from Misty stopping the stream
"""
path = f"http://{self.ip}/api/videostreaming/stop"
stoppedStream = requests.post(path)
if stoppedStream.status_code == 200:
self.stream = None
print(f"Stopped stream: {stoppedStream.status_code} {stoppedStream.json()}")
return stoppedStream
def drive(self, x, y):
"""Move the misty towards an x, y coordinage
Args:
x (int): The horizontal destination
y (int): The vertical destination
"""
# TODO: Update calculations for x,y to velocities
# TODO: Figure out which coordinates we would want to make the robot go backwards (since backwards is never shown)
drivePath = f"http://{self.ip}/api/drive/time"
driveResponse = requests.post(
drivePath,
params={
"LinearVelocity": y,
"AngularVelocity": x,
"timems": 1000,
},
)
print(f"Drive response: {driveResponse.status_code} {driveResponse.json()}")
return driveResponse
class MistyPigeonApp:
def __init__(self, root, misty_ip, ws_port):
self.root = root
self.root.title("Misty Piegon Control")
self.root.bind("<Button 1>", self.button_click)
# Create a button with a placeholder image
self.placeholder_image = Image.new("RGB", (1280, 960), color="gray")
self.tk_image = ImageTk.PhotoImage(self.placeholder_image)
self.button = ttk.Button(root, image=self.tk_image)
self.button.pack()
self.misty_ip = misty_ip
self.ws_port = ws_port
self.connectedMisty = MistyHelper(misty_ip)
self.started = self.connectedMisty.start_stream(ws_port)
self.websocket_thread = threading.Thread(target=self.run_websocket_client)
self.websocket_thread.daemon = True
self.websocket_thread.start()
async def receive_image(self):
uri = f"ws://{self.misty_ip}:{self.ws_port}" # Replace with your WebSocket server URI
async with websockets.connect(uri) as websocket:
while True:
# Receive image data from WebSocket
image_data = await websocket.recv()
# Convert the received data to an image
image = Image.open(io.BytesIO(image_data))
# Update the button image
self.tk_image = ImageTk.PhotoImage(image)
self.button.config(image=self.tk_image)
def run_websocket_client(self):
asyncio.run(self.receive_image())
def button_click(self, eventorigin):
self.connectedMisty.drive(
(eventorigin.x - (1280 / 2)) / (1280 / 2) * 100,
((960 / 2) - eventorigin.y) / (960 / 2) * 100,
)
if __name__ == "__main__":
misty_ip = "192.168.0.50"
ws_port = "5678"
root = Tk()
app = MistyPigeonApp(root, misty_ip, ws_port)
root.mainloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment