Created
March 22, 2020 13:41
-
-
Save codedokode/12d852f999c35fd73ff892a3b2c19279 to your computer and use it in GitHub Desktop.
Test Python script for xdg-dbus-proxy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import socket | |
import sys | |
import binascii | |
from pprint import pprint | |
# Tests xdg-dbus-proxy by sending different messages to check whether it | |
# would crash or not. | |
# | |
# How to use: | |
# | |
# Compile debug version of xdg-dbus-proxy with address sanitizer | |
# (make -e CFLAGS="-g -O0 -fsanitize=address" LDFLAGS="-fsanitize=address") | |
# | |
# Start it, e.g. | |
# | |
# xdg-dbus-proxy "$DBUS_SESSION_BUS_ADDRESS" /tmp/test1 --log | |
# | |
# | |
# Then run this script using pytest: | |
# | |
# SOCKET_PATH=/tmp/test1 pytest-3 test-xdg-proxy.py | |
# | |
SOCKET_PATH = os.getenv('SOCKET_PATH') | |
def send_to_proxy(messages): | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
sock.connect(SOCKET_PATH) | |
for message in messages: | |
sock.setblocking(True) | |
dump_message('SEND', message) | |
try: | |
sock.sendall(message) | |
except BrokenPipeError: | |
print("Proxy has disconnected") | |
return | |
sock.setblocking(False) | |
try: | |
(data, ancdata, msg_flags, address) = sock.recvmsg(16384) | |
dump_message('RECV', data) | |
dump_ancdata(ancdata) | |
except BlockingIOError: | |
pass | |
# (data, ancdata, msg_flags, address) = sock.recvmsg(16384) | |
# dump_message('RECV', data) | |
# dump_ancdata(ancdata) | |
sock.close() | |
def dump_message(type, data): | |
fmt = '{:8} {:48} {:16}' | |
for line in range(0, len(data), 16): | |
row = data[line:line+16] | |
as_bytes = ["{:02x}".format(b) for b in row] | |
as_chars = [chr(ch) if ch >= ord(' ') and ch <= 0x7f else '.' for ch in row] | |
print(fmt.format( | |
type if line == 0 else '', | |
" ".join(as_bytes), | |
"".join(as_chars) | |
)) | |
print("") | |
def dump_ancdata(ancdata): | |
pprint(ancdata) | |
def make_garbage(data, xor_byte, offset, step = 1): | |
buf = bytearray(data) | |
for i in range(offset, len(data), step): | |
buf[i] = buf[i] ^ xor_byte | |
return buf | |
def slice_messages(messages, slice_size): | |
message = b"".join(messages) | |
pieces = [] | |
offset = 0 | |
while offset < len(message): | |
piece = message[offset:offset + slice_size] | |
pieces.append(piece) | |
offset += slice_size | |
return pieces | |
def hex_encode_uid(uid): | |
return binascii.hexlify(bytes(str(uid), 'ASCII')) | |
# Ths program's user id as HEX | |
hex_uid = hex_encode_uid(os.geteuid()) | |
# Recorded from dbus-send session | |
MESSAGES = [ | |
# First byte | |
bytes.fromhex('00') + b'AUTH EXTERNAL ' + hex_uid + b'\r\n', | |
b'NEGOTIATE_UNIX_FD\r\n', | |
b'BEGIN\r\n', | |
( | |
bytes.fromhex('6c 01 00 01 00 00 00 00 01 00 00 00 6e 00 00 00') + # l...........n... | |
bytes.fromhex('01 01 6f 00 15 00 00 00 2f 6f 72 67 2f 66 72 65') + # ..o...../org/fre | |
bytes.fromhex('65 64 65 73 6b 74 6f 70 2f 44 42 75 73 00 00 00') + # edesktop/DBus... | |
bytes.fromhex('06 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65') + # ..s.....org.free | |
bytes.fromhex('64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00') + # desktop.DBus.... | |
bytes.fromhex('02 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65') + # ..s.....org.free | |
bytes.fromhex('64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00') + # desktop.DBus.... | |
bytes.fromhex('03 01 73 00 05 00 00 00 48 65 6c 6c 6f 00 00 00') # ..s.....Hello... | |
), | |
( | |
bytes.fromhex('6c 01 00 01 00 00 00 00 02 00 00 00 75 00 00 00') + # l...........u... | |
bytes.fromhex('01 01 6f 00 15 00 00 00 2f 6f 72 67 2f 66 72 65') + # ..o...../org/fre | |
bytes.fromhex('65 64 65 73 6b 74 6f 70 2f 44 42 75 73 00 00 00') + # edesktop/DBus... | |
bytes.fromhex('02 01 73 00 14 00 00 00 6f 72 67 2e 66 72 65 65') + # ..s.....org.free | |
bytes.fromhex('64 65 73 6b 74 6f 70 2e 44 42 75 73 00 00 00 00') + # desktop.DBus.... | |
bytes.fromhex('03 01 73 00 09 00 00 00 4c 69 73 74 4e 61 6d 65') + # ..s.....ListName | |
bytes.fromhex('73 00 00 00 00 00 00 00 06 01 73 00 14 00 00 00') + # s.........s..... | |
bytes.fromhex('6f 72 67 2e 66 72 65 65 64 65 73 6b 74 6f 70 2e') + # org.freedesktop. | |
bytes.fromhex('44 42 75 73 00 00 00 00') # DBus.... | |
) | |
] | |
def test_partial(): | |
for i in range(1, len(MESSAGES)): | |
print("Send {}/{}\n".format(i, len(MESSAGES))) | |
send_to_proxy(MESSAGES[0:i]) | |
def test_broken(): | |
for i in range(2, len(MESSAGES)): | |
for cut in range(1, len(MESSAGES[i]) - 1, 5): | |
print("{} messages, last trimmed at -{}".format(i, cut)) | |
subset = MESSAGES[0:i] | |
subset[-1] = subset[-1][:-cut] | |
send_to_proxy(subset) | |
def test_garbage(): | |
for i in range (2, len(MESSAGES)): | |
print("{} messages, last with garbage".format(i)) | |
subset = MESSAGES[0:i] | |
subset[-1] = make_garbage(subset[-1], 0x55, 4, 2) | |
send_to_proxy(subset) | |
# Can be used to detect memory leaks | |
def test_reconnecting(): | |
times = 1 | |
for i in range(0, len(MESSAGES)): | |
print("Send {} messages {} times".format(i, times)) | |
subset = MESSAGES[0:i] | |
for j in range(0, times): | |
send_to_proxy(subset) | |
def test_send_by_pieces(): | |
for slice_size in [1, 3, 5, 17]: | |
print("Send messages in {}-byte slices".format(slice_size)) | |
sliced = slice_messages(MESSAGES, slice_size) | |
send_to_proxy(sliced) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment