Last active
May 22, 2025 16:24
-
-
Save raspiduino/549803f87bc593c726e55d79eb987b2c to your computer and use it in GitHub Desktop.
Onvif PTZ controller for Yoosee camera (might work for other camera but not tested) with minimum dependencies (only requests)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os | |
import base64 | |
import hashlib | |
from datetime import datetime, timezone | |
import requests | |
from http.client import RemoteDisconnected | |
# This function is literally imported from Gemini, I can't write comments *this* detailed and format *this* properly :) | |
def generate_soap_header_values(plain_password: str) -> dict: | |
""" | |
Generates the 'PasswordDigest', 'Nonce', and 'Created' values | |
for a SOAP UsernameToken header, following the WSS: SOAP Message Security | |
specification. | |
Args: | |
plain_password (str): The plain-text password (or shared secret) | |
to be used in the digest calculation. | |
Returns: | |
dict: A dictionary containing the generated values: | |
- 'password_digest': The Base64 encoded SHA-1 hash of (nonce + created + password). | |
- 'nonce': A Base64 encoded cryptographically random nonce. | |
- 'created': An ISO 8601 formatted UTC timestamp with milliseconds. | |
""" | |
# 1. Generate the Nonce (Number Used Once) | |
# The spec states a nonce is a cryptographically random value. | |
# We generate 16 random bytes, which is a common and secure length. | |
nonce_bytes = os.urandom(16) | |
# The nonce is then Base64 encoded for inclusion in the XML. | |
nonce_b64 = base64.b64encode(nonce_bytes).decode('utf-8') | |
# 2. Generate the Created Timestamp | |
# Get the current time in UTC. | |
now_utc = datetime.now(timezone.utc) | |
# Format the timestamp according to ISO 8601, including milliseconds, | |
# and replacing the timezone offset with 'Z' to denote UTC. | |
# Example format: "2025-05-22T11:10:56.324Z" | |
created_timestamp = now_utc.isoformat(timespec='milliseconds').replace('+00:00', 'Z') | |
# 3. Generate the PasswordDigest | |
# The spec defines PasswordDigest as: Base64 ( SHA-1 ( nonce + created + password ) ) | |
# It's crucial that the 'nonce' used here is its decoded (binary) value, | |
# and 'created' and 'password' are their UTF-8 encoded byte sequences. | |
# Ensure all components are in bytes for hashing | |
# nonce_bytes is already in bytes from step 1. | |
created_bytes = created_timestamp.encode('utf-8') | |
password_bytes = plain_password.encode('utf-8') | |
# Concatenate the byte sequences in the specified order: nonce + created + password | |
# The spec explicitly notes the secret (password) is at the end. | |
concatenated_bytes = nonce_bytes + created_bytes + password_bytes | |
# Calculate the SHA-1 hash of the concatenated bytes. | |
sha1_hash_result = hashlib.sha1(concatenated_bytes).digest() | |
# Base64 encode the resulting SHA-1 hash. | |
password_digest_b64 = base64.b64encode(sha1_hash_result).decode('utf-8') | |
return { | |
"password_digest": password_digest_b64, | |
"nonce": nonce_b64, | |
"created": created_timestamp | |
} | |
# (This part is written by hand) | |
class PTZController(): | |
def __init__(self, url: str, username: str, password: str): | |
self.onvif_ptz_url = url | |
self.username = username | |
self.password = password | |
''' | |
Send move command to camera | |
NOTE: remember to send stop command with send_stop_command! | |
Cartesian coordinate system is applied relative to camera stream's POV | |
NOTE: some camera only support moving in ONE axis at a time | |
Args: | |
x (float): x-axis velocity (-1.0 to 1.0) | |
y (float): y-axis velocity (-1.0 to 1.0) | |
''' | |
def send_continuous_move_command(self, x: float, y: float): | |
# Generate SOAP header values | |
header_values = generate_soap_header_values(self.password) | |
# Header and data template for continuous move command | |
headers = {'Content-Type': 'application/soap+xml; charset=utf-8; action="http://www.onvif.org/ver20/ptz/wsdl/ContinuousMove"', 'Connection': 'Close'} | |
data = f'<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"><s:Header><Security s:mustUnderstand="1" xmlns="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"><UsernameToken><Username>{self.username}</Username><Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest">{header_values["password_digest"]}</Password><Nonce EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary">{header_values["nonce"]}</Nonce><Created xmlns="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">{header_values["created"]}</Created></UsernameToken></Security></s:Header><s:Body xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><ContinuousMove xmlns="http://www.onvif.org/ver20/ptz/wsdl"><ProfileToken>IPCProfilesToken0</ProfileToken><Velocity><PanTilt x="{x}" y="{y}" xmlns="http://www.onvif.org/ver10/schema"/></Velocity></ContinuousMove></s:Body></s:Envelope>' | |
# Send request | |
try: | |
# Don't wait for server to return | |
# On Yoosee if you wait for server to return before sending stop command, chances are your command (especially y-axis) might fail (don't know why) | |
# We don't need the response anyway | |
requests.post(self.onvif_ptz_url, headers=headers, data=data, timeout=(None, 0.00001)) | |
except requests.exceptions.ReadTimeout: | |
# Read timeout is what we expect (with that 00001s timeout) | |
pass | |
except Exception as e: | |
print(f"send_continuous_move_command failed with exception {e}") | |
def send_stop_command(self): | |
# Generate SOAP header values | |
header_values = generate_soap_header_values(self.password) | |
# Header and data template for stop command | |
headers = {'Content-Type': 'application/soap+xml; charset=utf-8; action="http://www.onvif.org/ver20/ptz/wsdl/Stop"','Connection': 'Close',} | |
data = f'<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"><s:Header><Security s:mustUnderstand="1" xmlns="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"><UsernameToken><Username>{self.username}</Username><Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest">{header_values["password_digest"]}=</Password><Nonce EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary">{header_values["nonce"]}</Nonce><Created xmlns="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">{header_values["created"]}</Created></UsernameToken></Security></s:Header><s:Body xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><Stop xmlns="http://www.onvif.org/ver20/ptz/wsdl"><ProfileToken>IPCProfilesToken0</ProfileToken><PanTilt>true</PanTilt><Zoom>false</Zoom></Stop></s:Body></s:Envelope>' | |
# Send request | |
try: | |
response = requests.post(self.onvif_ptz_url, headers=headers, data=data) | |
if not response.ok: | |
print(f"Camera returned HTTP status code {response.status_code} with content:\n{response.content}") | |
except Exception as e: | |
msg = str(e) | |
# It is NORMAL (at least for Yoosee camera) for stop command NOT TO receive response at all | |
# So we will just ignore the exception if it's about that | |
if not "Remote end closed connection without response" in msg: | |
print(f"send_continuous_move_command failed with exception {msg}") | |
''' | |
Move camera with velocity for a really short amount of time | |
If this timing is not suitable for you, you might manually use send_continuous_move_command and send_stop_command | |
Cartesian coordinate system is applied relative to camera stream's POV | |
NOTE: some camera only support moving in ONE axis at a time | |
Args: | |
x (float): x-axis velocity | |
y (float): y-axis velocity | |
''' | |
def continuous_move(self, x: float, y: float): | |
# Send move command | |
self.send_continuous_move_command(x, y) | |
# Immidiately stop (HTTP POST time is enough for camera to move for a while) | |
# If this is not suitable for you, you might manually use send_continuous_move_command and send_stop_command | |
self.send_stop_command() | |
def up(self): | |
self.continuous_move(0, 0.5) | |
def down(self): | |
self.continuous_move(0, -0.5) | |
def left(self): | |
self.continuous_move(-0.5, 0) | |
def right(self): | |
self.continuous_move(0.5, 0) | |
def up_left(self): | |
self.up() | |
self.left() | |
def up_right(self): | |
self.up() | |
self.right() | |
def down_left(self): | |
self.down() | |
self.left() | |
def down_right(self): | |
self.down() | |
self.right() | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv) != 6: | |
print("Usage: ptz.py http://ip:port/onvif/deviceio_service username password x_velocity y_velocity") | |
mycam = PTZController(sys.argv[1], sys.argv[2], sys.argv[3]) | |
mycam.continuous_move(float(sys.argv[4]), float(sys.argv[5])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment