Skip to content

Instantly share code, notes, and snippets.

@codedokode
Created March 22, 2020 13:41
Show Gist options
  • Save codedokode/12d852f999c35fd73ff892a3b2c19279 to your computer and use it in GitHub Desktop.
Save codedokode/12d852f999c35fd73ff892a3b2c19279 to your computer and use it in GitHub Desktop.
Test Python script for xdg-dbus-proxy
import os
import socket
import sys
import binascii
from pprint import pprint
# Tests xdg-dbus-proxy by sending different messages to check whether it
# would crash or not.
#
# How to use:
#
# Compile debug version of xdg-dbus-proxy with address sanitizer
# (make -e CFLAGS="-g -O0 -fsanitize=address" LDFLAGS="-fsanitize=address")
#
# Start it, e.g.
#
# xdg-dbus-proxy "$DBUS_SESSION_BUS_ADDRESS" /tmp/test1 --log
#
#
# Then run this script using pytest:
#
# SOCKET_PATH=/tmp/test1 pytest-3 test-xdg-proxy.py
#
SOCKET_PATH = os.getenv('SOCKET_PATH')
def send_to_proxy(messages):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(SOCKET_PATH)
for message in messages:
sock.setblocking(True)
dump_message('SEND', message)
try:
sock.sendall(message)
except BrokenPipeError:
print("Proxy has disconnected")
return
sock.setblocking(False)
try:
(data, ancdata, msg_flags, address) = sock.recvmsg(16384)
dump_message('RECV', data)
dump_ancdata(ancdata)
except BlockingIOError:
pass
# (data, ancdata, msg_flags, address) = sock.recvmsg(16384)
# dump_message('RECV', data)
# dump_ancdata(ancdata)
sock.close()
def dump_message(type, data):
fmt = '{:8} {:48} {:16}'
for line in range(0, len(data), 16):
row = data[line:line+16]
as_bytes = ["{:02x}".format(b) for b in row]
as_chars = [chr(ch) if ch >= ord(' ') and ch <= 0x7f else '.' for ch in row]
print(fmt.format(
type if line == 0 else '',
" ".join(as_bytes),
"".join(as_chars)
))
print("")
def dump_ancdata(ancdata):
pprint(ancdata)
def make_garbage(data, xor_byte, offset, step = 1):
buf = bytearray(data)
for i in range(offset, len(data), step):
buf[i] = buf[i] ^ xor_byte
return buf
def slice_messages(messages, slice_size):
message = b"".join(messages)
pieces = []
offset = 0
while offset < len(message):
piece = message[offset:offset + slice_size]
pieces.append(piece)
offset += slice_size
return pieces
def hex_encode_uid(uid):
return binascii.hexlify(bytes(str(uid), 'ASCII'))
# Ths program's user id as HEX
hex_uid = hex_encode_uid(os.geteuid())
# Recorded from dbus-send session
MESSAGES = [
# First byte
bytes.fromhex('00') + b'AUTH EXTERNAL ' + hex_uid + b'\r\n',
b'NEGOTIATE_UNIX_FD\r\n',
b'BEGIN\r\n',
(
bytes.fromhex('6c 01 00 01 00 00 00 00 01 00 00 00 6e 00 00 00') + # l...........n...
bytes.fromhex('01 01 6f 00 15 00 00 00 2f 6f 72 67 2f 66 72 65') + # ..o...../org/fre
bytes.fromhex('65 64 65 73 6b 74 6f 70 2f 44 42 75 73 00 00 00') + # edesktop/DBus...
bytes.fromhex('06 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65') + # ..s.....org.free
bytes.fromhex('64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00') + # desktop.DBus....
bytes.fromhex('02 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65') + # ..s.....org.free
bytes.fromhex('64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00') + # desktop.DBus....
bytes.fromhex('03 01 73 00 05 00 00 00 48 65 6c 6c 6f 00 00 00') # ..s.....Hello...
),
(
bytes.fromhex('6c 01 00 01 00 00 00 00 02 00 00 00 75 00 00 00') + # l...........u...
bytes.fromhex('01 01 6f 00 15 00 00 00 2f 6f 72 67 2f 66 72 65') + # ..o...../org/fre
bytes.fromhex('65 64 65 73 6b 74 6f 70 2f 44 42 75 73 00 00 00') + # edesktop/DBus...
bytes.fromhex('02 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65') + # ..s.....org.free
bytes.fromhex('64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00') + # desktop.DBus....
bytes.fromhex('03 01 73 00 09 00 00 00 4c 69 73 74 4e 61 6d 65') + # ..s.....ListName
bytes.fromhex('73 00 00 00 00 00 00 00 06 01 73 00 14 00 00 00') + # s.........s.....
bytes.fromhex('6f 72 67 2e 66 72 65 65 64 65 73 6b 74 6f 70 2e') + # org.freedesktop.
bytes.fromhex('44 42 75 73 00 00 00 00') # DBus....
)
]
def test_partial():
for i in range(1, len(MESSAGES)):
print("Send {}/{}\n".format(i, len(MESSAGES)))
send_to_proxy(MESSAGES[0:i])
def test_broken():
for i in range(2, len(MESSAGES)):
for cut in range(1, len(MESSAGES[i]) - 1, 5):
print("{} messages, last trimmed at -{}".format(i, cut))
subset = MESSAGES[0:i]
subset[-1] = subset[-1][:-cut]
send_to_proxy(subset)
def test_garbage():
for i in range (2, len(MESSAGES)):
print("{} messages, last with garbage".format(i))
subset = MESSAGES[0:i]
subset[-1] = make_garbage(subset[-1], 0x55, 4, 2)
send_to_proxy(subset)
# Can be used to detect memory leaks
def test_reconnecting():
times = 1
for i in range(0, len(MESSAGES)):
print("Send {} messages {} times".format(i, times))
subset = MESSAGES[0:i]
for j in range(0, times):
send_to_proxy(subset)
def test_send_by_pieces():
for slice_size in [1, 3, 5, 17]:
print("Send messages in {}-byte slices".format(slice_size))
sliced = slice_messages(MESSAGES, slice_size)
send_to_proxy(sliced)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment