Skip to content

Instantly share code, notes, and snippets.

@raspiduino
Last active May 22, 2025 16:24
Show Gist options
  • Save raspiduino/549803f87bc593c726e55d79eb987b2c to your computer and use it in GitHub Desktop.
Save raspiduino/549803f87bc593c726e55d79eb987b2c to your computer and use it in GitHub Desktop.
Onvif PTZ controller for Yoosee camera (might work for other camera but not tested) with minimum dependencies (only requests)
#!/usr/bin/python3
import os
import base64
import hashlib
from datetime import datetime, timezone
import requests
from http.client import RemoteDisconnected
# This function is literally imported from Gemini, I can't write comments *this* detailed and format *this* properly :)
def generate_soap_header_values(plain_password: str) -> dict:
"""
Generates the 'PasswordDigest', 'Nonce', and 'Created' values
for a SOAP UsernameToken header, following the WSS: SOAP Message Security
specification.
Args:
plain_password (str): The plain-text password (or shared secret)
to be used in the digest calculation.
Returns:
dict: A dictionary containing the generated values:
- 'password_digest': The Base64 encoded SHA-1 hash of (nonce + created + password).
- 'nonce': A Base64 encoded cryptographically random nonce.
- 'created': An ISO 8601 formatted UTC timestamp with milliseconds.
"""
# 1. Generate the Nonce (Number Used Once)
# The spec states a nonce is a cryptographically random value.
# We generate 16 random bytes, which is a common and secure length.
nonce_bytes = os.urandom(16)
# The nonce is then Base64 encoded for inclusion in the XML.
nonce_b64 = base64.b64encode(nonce_bytes).decode('utf-8')
# 2. Generate the Created Timestamp
# Get the current time in UTC.
now_utc = datetime.now(timezone.utc)
# Format the timestamp according to ISO 8601, including milliseconds,
# and replacing the timezone offset with 'Z' to denote UTC.
# Example format: "2025-05-22T11:10:56.324Z"
created_timestamp = now_utc.isoformat(timespec='milliseconds').replace('+00:00', 'Z')
# 3. Generate the PasswordDigest
# The spec defines PasswordDigest as: Base64 ( SHA-1 ( nonce + created + password ) )
# It's crucial that the 'nonce' used here is its decoded (binary) value,
# and 'created' and 'password' are their UTF-8 encoded byte sequences.
# Ensure all components are in bytes for hashing
# nonce_bytes is already in bytes from step 1.
created_bytes = created_timestamp.encode('utf-8')
password_bytes = plain_password.encode('utf-8')
# Concatenate the byte sequences in the specified order: nonce + created + password
# The spec explicitly notes the secret (password) is at the end.
concatenated_bytes = nonce_bytes + created_bytes + password_bytes
# Calculate the SHA-1 hash of the concatenated bytes.
sha1_hash_result = hashlib.sha1(concatenated_bytes).digest()
# Base64 encode the resulting SHA-1 hash.
password_digest_b64 = base64.b64encode(sha1_hash_result).decode('utf-8')
return {
"password_digest": password_digest_b64,
"nonce": nonce_b64,
"created": created_timestamp
}
# (This part is written by hand)
class PTZController():
def __init__(self, url: str, username: str, password: str):
self.onvif_ptz_url = url
self.username = username
self.password = password
'''
Send move command to camera
NOTE: remember to send stop command with send_stop_command!
Cartesian coordinate system is applied relative to camera stream's POV
NOTE: some camera only support moving in ONE axis at a time
Args:
x (float): x-axis velocity (-1.0 to 1.0)
y (float): y-axis velocity (-1.0 to 1.0)
'''
def send_continuous_move_command(self, x: float, y: float):
# Generate SOAP header values
header_values = generate_soap_header_values(self.password)
# Header and data template for continuous move command
headers = {'Content-Type': 'application/soap+xml; charset=utf-8; action="http://www.onvif.org/ver20/ptz/wsdl/ContinuousMove"', 'Connection': 'Close'}
data = f'<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"><s:Header><Security s:mustUnderstand="1" xmlns="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"><UsernameToken><Username>{self.username}</Username><Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest">{header_values["password_digest"]}</Password><Nonce EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary">{header_values["nonce"]}</Nonce><Created xmlns="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">{header_values["created"]}</Created></UsernameToken></Security></s:Header><s:Body xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><ContinuousMove xmlns="http://www.onvif.org/ver20/ptz/wsdl"><ProfileToken>IPCProfilesToken0</ProfileToken><Velocity><PanTilt x="{x}" y="{y}" xmlns="http://www.onvif.org/ver10/schema"/></Velocity></ContinuousMove></s:Body></s:Envelope>'
# Send request
try:
# Don't wait for server to return
# On Yoosee if you wait for server to return before sending stop command, chances are your command (especially y-axis) might fail (don't know why)
# We don't need the response anyway
requests.post(self.onvif_ptz_url, headers=headers, data=data, timeout=(None, 0.00001))
except requests.exceptions.ReadTimeout:
# Read timeout is what we expect (with that 00001s timeout)
pass
except Exception as e:
print(f"send_continuous_move_command failed with exception {e}")
def send_stop_command(self):
# Generate SOAP header values
header_values = generate_soap_header_values(self.password)
# Header and data template for stop command
headers = {'Content-Type': 'application/soap+xml; charset=utf-8; action="http://www.onvif.org/ver20/ptz/wsdl/Stop"','Connection': 'Close',}
data = f'<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"><s:Header><Security s:mustUnderstand="1" xmlns="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"><UsernameToken><Username>{self.username}</Username><Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest">{header_values["password_digest"]}=</Password><Nonce EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary">{header_values["nonce"]}</Nonce><Created xmlns="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">{header_values["created"]}</Created></UsernameToken></Security></s:Header><s:Body xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><Stop xmlns="http://www.onvif.org/ver20/ptz/wsdl"><ProfileToken>IPCProfilesToken0</ProfileToken><PanTilt>true</PanTilt><Zoom>false</Zoom></Stop></s:Body></s:Envelope>'
# Send request
try:
response = requests.post(self.onvif_ptz_url, headers=headers, data=data)
if not response.ok:
print(f"Camera returned HTTP status code {response.status_code} with content:\n{response.content}")
except Exception as e:
msg = str(e)
# It is NORMAL (at least for Yoosee camera) for stop command NOT TO receive response at all
# So we will just ignore the exception if it's about that
if not "Remote end closed connection without response" in msg:
print(f"send_continuous_move_command failed with exception {msg}")
'''
Move camera with velocity for a really short amount of time
If this timing is not suitable for you, you might manually use send_continuous_move_command and send_stop_command
Cartesian coordinate system is applied relative to camera stream's POV
NOTE: some camera only support moving in ONE axis at a time
Args:
x (float): x-axis velocity
y (float): y-axis velocity
'''
def continuous_move(self, x: float, y: float):
# Send move command
self.send_continuous_move_command(x, y)
# Immidiately stop (HTTP POST time is enough for camera to move for a while)
# If this is not suitable for you, you might manually use send_continuous_move_command and send_stop_command
self.send_stop_command()
def up(self):
self.continuous_move(0, 0.5)
def down(self):
self.continuous_move(0, -0.5)
def left(self):
self.continuous_move(-0.5, 0)
def right(self):
self.continuous_move(0.5, 0)
def up_left(self):
self.up()
self.left()
def up_right(self):
self.up()
self.right()
def down_left(self):
self.down()
self.left()
def down_right(self):
self.down()
self.right()
if __name__ == "__main__":
import sys
if len(sys.argv) != 6:
print("Usage: ptz.py http://ip:port/onvif/deviceio_service username password x_velocity y_velocity")
mycam = PTZController(sys.argv[1], sys.argv[2], sys.argv[3])
mycam.continuous_move(float(sys.argv[4]), float(sys.argv[5]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment