Last active
October 9, 2024 06:51
-
-
Save c2h2/062df15758dc146a466da6d66dd049c4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#write a rtsp dummy dubug purpose server and log all the connection in sperate time format filenames, with /1 as path, listen 0.0.0.0:554, serves a yellow 720p screen. best do not install any addition apts and pips, able to quit program with control-c and runs on windows, log and print all raw socket response and request data | |
import socket | |
import threading | |
import datetime | |
import os | |
import sys | |
import time | |
import random | |
import struct | |
import base64 | |
import uuid | |
# Configuration | |
HOST = '0.0.0.0' | |
PORT = 554 | |
LOG_DIR = 'logs' | |
STREAM_PATH = '/1' | |
# RTP Configuration | |
RTP_VERSION = 2 | |
RTP_PAYLOAD_TYPE = 96 # Dynamic payload type for H264 | |
RTP_TIMESTAMP_INCREMENT = 90000 // 30 # Assuming 30 fps | |
RTP_SSRC = random.randint(100000, 999999) | |
RTP_SEQUENCE_NUMBER = 0 | |
# Ensure the log directory exists | |
os.makedirs(LOG_DIR, exist_ok=True) | |
# Simple RTSP response templates | |
RTSP_OK_RESPONSE = """RTSP/1.0 200 OK\r | |
CSeq: {cseq}\r | |
Session: {session}\r | |
\r | |
""" | |
RTSP_SETUP_RESPONSE = """RTSP/1.0 200 OK\r | |
CSeq: {cseq}\r | |
Session: {session}\r | |
Transport: RTP/AVP;unicast;client_port={client_port}\r | |
\r | |
""" | |
RTSP_DESCRIBE_RESPONSE = """RTSP/1.0 200 OK\r | |
CSeq: {cseq}\r | |
Content-Base: rtsp://{host}:{port}{path}\r | |
Content-Type: application/sdp\r | |
Content-Length: {length}\r | |
\r | |
{sdp} | |
""" | |
RTSP_OPTIONS_RESPONSE = """RTSP/1.0 200 OK\r | |
CSeq: {cseq}\r | |
Public: DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE\r | |
\r | |
""" | |
# SDP description | |
SDP_DESCRIPTION = f"""v=0 | |
o=- 0 0 IN IP4 {HOST} | |
s=Dummy Yellow Stream | |
c=IN IP4 {HOST} | |
t=0 0 | |
a=control:{STREAM_PATH} | |
m=video 0 RTP/AVP {RTP_PAYLOAD_TYPE} | |
a=rtpmap:{RTP_PAYLOAD_TYPE} H264/90000 | |
a=control:streamid={STREAM_PATH} | |
""" | |
def log_connection(client_address): | |
""" | |
Create a unique log file for each connection using UUID. | |
""" | |
unique_id = uuid.uuid4() | |
log_filename = os.path.join(LOG_DIR, f"{unique_id}_{client_address[0]}_{client_address[1]}.log") | |
log_file = open(log_filename, 'a') | |
log_file.write(f"Connection from {client_address}\n") | |
return log_file | |
def log_raw_data(log_file, direction, data): | |
""" | |
Log raw socket data to the log file and print to console. | |
:param log_file: File object to write logs. | |
:param direction: 'RECV' for received data, 'SEND' for sent data. | |
:param data: The raw data to log. | |
""" | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
log_entry = f"[{timestamp}] {direction}:\n{data}\n" | |
log_file.write(log_entry) | |
log_file.flush() # Ensure data is written to the file immediately | |
print(log_entry) | |
def create_rtp_packet(sequence_number, timestamp, payload): | |
""" | |
Create an RTP packet with the given sequence number, timestamp, and payload. | |
""" | |
global RTP_SSRC | |
version = RTP_VERSION | |
padding = 0 | |
extension = 0 | |
cc = 0 | |
marker = 0 | |
payload_type = RTP_PAYLOAD_TYPE | |
header = (version << 6) | (padding << 5) | (extension << 4) | cc | |
header_byte2 = (marker << 7) | payload_type | |
rtp_header = struct.pack("!BBHII", header, header_byte2, sequence_number, timestamp, RTP_SSRC) | |
return rtp_header + payload | |
def send_dummy_rtp(client_info): | |
""" | |
Send dummy RTP packets to the client at approximately 30 fps. | |
""" | |
global RTP_SEQUENCE_NUMBER | |
client_ip, client_port = client_info['client_ip'], client_info['client_port'] | |
rtp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
timestamp = int(time.time()) * RTP_TIMESTAMP_INCREMENT | |
try: | |
while client_info['is_playing']: | |
# Create dummy payload (e.g., empty payload or specific pattern) | |
payload = b'\x00' * 160 # Dummy payload | |
rtp_packet = create_rtp_packet(RTP_SEQUENCE_NUMBER, timestamp, payload) | |
rtp_socket.sendto(rtp_packet, (client_ip, client_port)) | |
RTP_SEQUENCE_NUMBER = (RTP_SEQUENCE_NUMBER + 1) % 65536 | |
timestamp += RTP_TIMESTAMP_INCREMENT | |
time.sleep(1/30) # 30 fps | |
except Exception as e: | |
print(f"Error sending RTP packets to {client_ip}:{client_port} - {e}") | |
finally: | |
rtp_socket.close() | |
def handle_client(conn, addr): | |
""" | |
Handle RTSP client connections. | |
""" | |
log_file = log_connection(addr) | |
session = "12345678" | |
client_info = { | |
'session': session, | |
'client_ip': addr[0], | |
'client_port': None, | |
'is_playing': False, | |
'rtp_thread': None | |
} | |
try: | |
while True: | |
data = conn.recv(4096) | |
if not data: | |
break | |
# Decode data for processing, but log raw bytes | |
try: | |
data_decoded = data.decode('utf-8') | |
except UnicodeDecodeError: | |
data_decoded = "<Non-UTF8 Data>" | |
# Log and print received data | |
log_raw_data(log_file, "RECV", data_decoded) | |
# Parse RTSP request | |
lines = data_decoded.split('\n') | |
request_line = lines[0].strip() | |
headers = {} | |
for line in lines[1:]: | |
if ':' in line: | |
key, value = line.split(':', 1) | |
headers[key.strip()] = value.strip() | |
parts = request_line.split(' ') | |
if len(parts) < 3: | |
continue | |
method, url, protocol = parts | |
cseq = headers.get('CSeq', '1') | |
response = "" | |
if method == 'OPTIONS': | |
response = RTSP_OPTIONS_RESPONSE.format(cseq=cseq) | |
elif method == 'DESCRIBE' and url == STREAM_PATH: | |
sdp = SDP_DESCRIPTION | |
response = RTSP_DESCRIBE_RESPONSE.format( | |
cseq=cseq, | |
host=HOST, | |
port=PORT, | |
path=STREAM_PATH, | |
length=len(sdp), | |
sdp=sdp | |
) | |
elif method == 'SETUP' and url == STREAM_PATH: | |
transport = headers.get('Transport', '') | |
# Extract client_port from Transport header | |
if 'client_port=' in transport: | |
client_port_str = transport.split('client_port=')[1] | |
client_port = client_port_str.split('-')[0] | |
client_info['client_port'] = int(client_port) | |
else: | |
client_info['client_port'] = 8000 # Default port if not specified | |
response = RTSP_SETUP_RESPONSE.format(cseq=cseq, session=session, client_port=client_info['client_port']) | |
elif method == 'PLAY': | |
response = RTSP_OK_RESPONSE.format(cseq=cseq, session=session) | |
if not client_info['is_playing']: | |
client_info['is_playing'] = True | |
# Start RTP streaming thread | |
client_info['rtp_thread'] = threading.Thread(target=send_dummy_rtp, args=(client_info,)) | |
client_info['rtp_thread'].start() | |
elif method == 'TEARDOWN': | |
response = RTSP_OK_RESPONSE.format(cseq=cseq, session=session) | |
conn.sendall(response.encode('utf-8')) | |
# Log and print sent response | |
log_raw_data(log_file, "SEND", response) | |
break | |
else: | |
response = f"RTSP/1.0 501 Not Implemented\r\nCSeq: {cseq}\r\n\r\n" | |
if response: | |
conn.sendall(response.encode('utf-8')) | |
# Log and print sent response | |
log_raw_data(log_file, "SEND", response) | |
except Exception as e: | |
error_message = f"Error handling client {addr}: {e}" | |
print(error_message) | |
log_raw_data(log_file, "ERROR", error_message) | |
finally: | |
client_info['is_playing'] = False | |
if client_info['rtp_thread']: | |
client_info['rtp_thread'].join() | |
conn.close() | |
log_file.write("Connection closed.\n") | |
log_file.close() | |
def start_server(): | |
""" | |
Start the RTSP server and listen for incoming connections. | |
""" | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: | |
try: | |
server_socket.bind((HOST, PORT)) | |
except PermissionError: | |
print(f"Permission denied: Cannot bind to port {PORT}. Try running as administrator.") | |
sys.exit(1) | |
except Exception as e: | |
print(f"Error binding to port {PORT}: {e}") | |
sys.exit(1) | |
server_socket.listen(5) | |
print(f"RTSP server listening on {HOST}:{PORT}") | |
try: | |
while True: | |
conn, addr = server_socket.accept() | |
print(f"Accepted connection from {addr}") | |
client_thread = threading.Thread(target=handle_client, args=(conn, addr)) | |
client_thread.daemon = True | |
client_thread.start() | |
except KeyboardInterrupt: | |
print("\nShutting down the server.") | |
sys.exit(0) | |
if __name__ == "__main__": | |
start_server() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment