Skip to content

Instantly share code, notes, and snippets.

@c2h2
Last active October 9, 2024 06:51
Show Gist options
  • Save c2h2/062df15758dc146a466da6d66dd049c4 to your computer and use it in GitHub Desktop.
Save c2h2/062df15758dc146a466da6d66dd049c4 to your computer and use it in GitHub Desktop.
#write a rtsp dummy dubug purpose server and log all the connection in sperate time format filenames, with /1 as path, listen 0.0.0.0:554, serves a yellow 720p screen. best do not install any addition apts and pips, able to quit program with control-c and runs on windows, log and print all raw socket response and request data
import socket
import threading
import datetime
import os
import sys
import time
import random
import struct
import base64
import uuid
# Configuration
HOST = '0.0.0.0'
PORT = 554
LOG_DIR = 'logs'
STREAM_PATH = '/1'
# RTP Configuration
RTP_VERSION = 2
RTP_PAYLOAD_TYPE = 96 # Dynamic payload type for H264
RTP_TIMESTAMP_INCREMENT = 90000 // 30 # Assuming 30 fps
RTP_SSRC = random.randint(100000, 999999)
RTP_SEQUENCE_NUMBER = 0
# Ensure the log directory exists
os.makedirs(LOG_DIR, exist_ok=True)
# Simple RTSP response templates
RTSP_OK_RESPONSE = """RTSP/1.0 200 OK\r
CSeq: {cseq}\r
Session: {session}\r
\r
"""
RTSP_SETUP_RESPONSE = """RTSP/1.0 200 OK\r
CSeq: {cseq}\r
Session: {session}\r
Transport: RTP/AVP;unicast;client_port={client_port}\r
\r
"""
RTSP_DESCRIBE_RESPONSE = """RTSP/1.0 200 OK\r
CSeq: {cseq}\r
Content-Base: rtsp://{host}:{port}{path}\r
Content-Type: application/sdp\r
Content-Length: {length}\r
\r
{sdp}
"""
RTSP_OPTIONS_RESPONSE = """RTSP/1.0 200 OK\r
CSeq: {cseq}\r
Public: DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE\r
\r
"""
# SDP description
SDP_DESCRIPTION = f"""v=0
o=- 0 0 IN IP4 {HOST}
s=Dummy Yellow Stream
c=IN IP4 {HOST}
t=0 0
a=control:{STREAM_PATH}
m=video 0 RTP/AVP {RTP_PAYLOAD_TYPE}
a=rtpmap:{RTP_PAYLOAD_TYPE} H264/90000
a=control:streamid={STREAM_PATH}
"""
def log_connection(client_address):
"""
Create a unique log file for each connection using UUID.
"""
unique_id = uuid.uuid4()
log_filename = os.path.join(LOG_DIR, f"{unique_id}_{client_address[0]}_{client_address[1]}.log")
log_file = open(log_filename, 'a')
log_file.write(f"Connection from {client_address}\n")
return log_file
def log_raw_data(log_file, direction, data):
"""
Log raw socket data to the log file and print to console.
:param log_file: File object to write logs.
:param direction: 'RECV' for received data, 'SEND' for sent data.
:param data: The raw data to log.
"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {direction}:\n{data}\n"
log_file.write(log_entry)
log_file.flush() # Ensure data is written to the file immediately
print(log_entry)
def create_rtp_packet(sequence_number, timestamp, payload):
"""
Create an RTP packet with the given sequence number, timestamp, and payload.
"""
global RTP_SSRC
version = RTP_VERSION
padding = 0
extension = 0
cc = 0
marker = 0
payload_type = RTP_PAYLOAD_TYPE
header = (version << 6) | (padding << 5) | (extension << 4) | cc
header_byte2 = (marker << 7) | payload_type
rtp_header = struct.pack("!BBHII", header, header_byte2, sequence_number, timestamp, RTP_SSRC)
return rtp_header + payload
def send_dummy_rtp(client_info):
"""
Send dummy RTP packets to the client at approximately 30 fps.
"""
global RTP_SEQUENCE_NUMBER
client_ip, client_port = client_info['client_ip'], client_info['client_port']
rtp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
timestamp = int(time.time()) * RTP_TIMESTAMP_INCREMENT
try:
while client_info['is_playing']:
# Create dummy payload (e.g., empty payload or specific pattern)
payload = b'\x00' * 160 # Dummy payload
rtp_packet = create_rtp_packet(RTP_SEQUENCE_NUMBER, timestamp, payload)
rtp_socket.sendto(rtp_packet, (client_ip, client_port))
RTP_SEQUENCE_NUMBER = (RTP_SEQUENCE_NUMBER + 1) % 65536
timestamp += RTP_TIMESTAMP_INCREMENT
time.sleep(1/30) # 30 fps
except Exception as e:
print(f"Error sending RTP packets to {client_ip}:{client_port} - {e}")
finally:
rtp_socket.close()
def handle_client(conn, addr):
"""
Handle RTSP client connections.
"""
log_file = log_connection(addr)
session = "12345678"
client_info = {
'session': session,
'client_ip': addr[0],
'client_port': None,
'is_playing': False,
'rtp_thread': None
}
try:
while True:
data = conn.recv(4096)
if not data:
break
# Decode data for processing, but log raw bytes
try:
data_decoded = data.decode('utf-8')
except UnicodeDecodeError:
data_decoded = "<Non-UTF8 Data>"
# Log and print received data
log_raw_data(log_file, "RECV", data_decoded)
# Parse RTSP request
lines = data_decoded.split('\n')
request_line = lines[0].strip()
headers = {}
for line in lines[1:]:
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
parts = request_line.split(' ')
if len(parts) < 3:
continue
method, url, protocol = parts
cseq = headers.get('CSeq', '1')
response = ""
if method == 'OPTIONS':
response = RTSP_OPTIONS_RESPONSE.format(cseq=cseq)
elif method == 'DESCRIBE' and url == STREAM_PATH:
sdp = SDP_DESCRIPTION
response = RTSP_DESCRIBE_RESPONSE.format(
cseq=cseq,
host=HOST,
port=PORT,
path=STREAM_PATH,
length=len(sdp),
sdp=sdp
)
elif method == 'SETUP' and url == STREAM_PATH:
transport = headers.get('Transport', '')
# Extract client_port from Transport header
if 'client_port=' in transport:
client_port_str = transport.split('client_port=')[1]
client_port = client_port_str.split('-')[0]
client_info['client_port'] = int(client_port)
else:
client_info['client_port'] = 8000 # Default port if not specified
response = RTSP_SETUP_RESPONSE.format(cseq=cseq, session=session, client_port=client_info['client_port'])
elif method == 'PLAY':
response = RTSP_OK_RESPONSE.format(cseq=cseq, session=session)
if not client_info['is_playing']:
client_info['is_playing'] = True
# Start RTP streaming thread
client_info['rtp_thread'] = threading.Thread(target=send_dummy_rtp, args=(client_info,))
client_info['rtp_thread'].start()
elif method == 'TEARDOWN':
response = RTSP_OK_RESPONSE.format(cseq=cseq, session=session)
conn.sendall(response.encode('utf-8'))
# Log and print sent response
log_raw_data(log_file, "SEND", response)
break
else:
response = f"RTSP/1.0 501 Not Implemented\r\nCSeq: {cseq}\r\n\r\n"
if response:
conn.sendall(response.encode('utf-8'))
# Log and print sent response
log_raw_data(log_file, "SEND", response)
except Exception as e:
error_message = f"Error handling client {addr}: {e}"
print(error_message)
log_raw_data(log_file, "ERROR", error_message)
finally:
client_info['is_playing'] = False
if client_info['rtp_thread']:
client_info['rtp_thread'].join()
conn.close()
log_file.write("Connection closed.\n")
log_file.close()
def start_server():
"""
Start the RTSP server and listen for incoming connections.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
try:
server_socket.bind((HOST, PORT))
except PermissionError:
print(f"Permission denied: Cannot bind to port {PORT}. Try running as administrator.")
sys.exit(1)
except Exception as e:
print(f"Error binding to port {PORT}: {e}")
sys.exit(1)
server_socket.listen(5)
print(f"RTSP server listening on {HOST}:{PORT}")
try:
while True:
conn, addr = server_socket.accept()
print(f"Accepted connection from {addr}")
client_thread = threading.Thread(target=handle_client, args=(conn, addr))
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nShutting down the server.")
sys.exit(0)
if __name__ == "__main__":
start_server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment