This gist is used for the Pigeon embodiment project to get a video stream from a Misty robot and present it to pigeons.
Tasks:
- Get Misty camera feed
- Post Misty camera feed in tkinter button
- Pass tkinter button click X Y coords to Misty motion
import requests | |
import websockets | |
from tkinter import Tk | |
from tkinter import ttk | |
from PIL import ImageTk, Image | |
import io | |
import threading | |
import asyncio | |
class MistyHelper: | |
def __init__(self, ip: str) -> None: | |
self.ip = ip | |
self.stream = None | |
def get_camera(self): | |
"""Get Misty's camera setting | |
Returns: | |
str: True or False | |
""" | |
camera = requests.get(f"http://{self.ip}/api/services/camera") | |
return camera.json()["result"] | |
def get_av(self): | |
"""Get Misty's AV setting | |
Returns: | |
str: True or False | |
""" | |
av = requests.get(f"http://{self.ip}/api/services/avstreaming") | |
return av.json()["result"] | |
def start_stream(self, port: int): | |
"""Start a misty streaming at port | |
Args: | |
port (int): The port to use for Misty's rtsp stream | |
Returns: | |
requests.Response: The response from Misty starting the stream | |
""" | |
if self.get_camera() != "True": | |
# Enable streaming first | |
print("Enabling streaming...") | |
enablePath = f"http://{self.ip}/api/services/camera/enable" | |
enableStream = requests.post(enablePath) | |
print(f"Enabling: {enableStream.status_code} {enableStream.json()}") | |
startPath = f"http://{self.ip}/api/videostreaming/start" | |
startedStream = requests.post( | |
startPath, | |
params={ | |
"Port": port, | |
"Rotation": 90, | |
"Width": 1280, | |
"Height": 960, | |
"Quality": 10, | |
}, | |
) | |
print(f"Starting stream: {startedStream.status_code} {startedStream.json()}") | |
if startedStream.status_code == 200: | |
print("Set stream") | |
self.stream = f"ws://{self.ip}:{port}" | |
return startedStream | |
def stop_stream(self): | |
"""Stop Misty's stream | |
Returns: | |
requests.Response: The response from Misty stopping the stream | |
""" | |
path = f"http://{self.ip}/api/videostreaming/stop" | |
stoppedStream = requests.post(path) | |
if stoppedStream.status_code == 200: | |
self.stream = None | |
print(f"Stopped stream: {stoppedStream.status_code} {stoppedStream.json()}") | |
return stoppedStream | |
def drive(self, x, y): | |
"""Move the misty towards an x, y coordinage | |
Args: | |
x (int): The horizontal destination | |
y (int): The vertical destination | |
""" | |
# TODO: Update calculations for x,y to velocities | |
# TODO: Figure out which coordinates we would want to make the robot go backwards (since backwards is never shown) | |
drivePath = f"http://{self.ip}/api/drive/time" | |
driveResponse = requests.post( | |
drivePath, | |
params={ | |
"LinearVelocity": y, | |
"AngularVelocity": x, | |
"timems": 1000, | |
}, | |
) | |
print(f"Drive response: {driveResponse.status_code} {driveResponse.json()}") | |
return driveResponse | |
class MistyPigeonApp: | |
def __init__(self, root, misty_ip, ws_port): | |
self.root = root | |
self.root.title("Misty Piegon Control") | |
self.root.bind("<Button 1>", self.button_click) | |
# Create a button with a placeholder image | |
self.placeholder_image = Image.new("RGB", (1280, 960), color="gray") | |
self.tk_image = ImageTk.PhotoImage(self.placeholder_image) | |
self.button = ttk.Button(root, image=self.tk_image) | |
self.button.pack() | |
self.misty_ip = misty_ip | |
self.ws_port = ws_port | |
self.connectedMisty = MistyHelper(misty_ip) | |
self.started = self.connectedMisty.start_stream(ws_port) | |
self.websocket_thread = threading.Thread(target=self.run_websocket_client) | |
self.websocket_thread.daemon = True | |
self.websocket_thread.start() | |
async def receive_image(self): | |
uri = f"ws://{self.misty_ip}:{self.ws_port}" # Replace with your WebSocket server URI | |
async with websockets.connect(uri) as websocket: | |
while True: | |
# Receive image data from WebSocket | |
image_data = await websocket.recv() | |
# Convert the received data to an image | |
image = Image.open(io.BytesIO(image_data)) | |
# Update the button image | |
self.tk_image = ImageTk.PhotoImage(image) | |
self.button.config(image=self.tk_image) | |
def run_websocket_client(self): | |
asyncio.run(self.receive_image()) | |
def button_click(self, eventorigin): | |
self.connectedMisty.drive( | |
(eventorigin.x - (1280 / 2)) / (1280 / 2) * 100, | |
((960 / 2) - eventorigin.y) / (960 / 2) * 100, | |
) | |
if __name__ == "__main__": | |
misty_ip = "192.168.0.50" | |
ws_port = "5678" | |
root = Tk() | |
app = MistyPigeonApp(root, misty_ip, ws_port) | |
root.mainloop() |