Last active
January 12, 2021 09:44
-
-
Save mmmunk/69129fb1053638a0efa614f7c7aebc72 to your computer and use it in GitHub Desktop.
Simple UDP packet commands from one program to another
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import socket | |
import random | |
# Create and bind UDP socket | |
bind_ip = sys.argv[1] | |
bind_port = int(sys.argv[2]) | |
msgsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
msgsock.bind((bind_ip, bind_port)) | |
print('Waiting for commands on', bind_ip, 'port', bind_port) | |
# Endless loop | |
while True: | |
# Blocking wait to receive UDP packet | |
data, address = msgsock.recvfrom(2048) | |
print('Received', data, 'from', address) | |
# Check size | |
if len(data) != 20: | |
print('Packet size is wrong for a command') | |
continue | |
# Check type | |
packet_type = int.from_bytes(data[0:4], 'little', signed=False) | |
print('Packet type is', packet_type) | |
if packet_type != 1: | |
print('Packet type is not a command') | |
continue | |
# Extract packet data | |
command_number = int.from_bytes(data[4:8], 'little', signed=False) | |
unique_id = int.from_bytes(data[8:12], 'little', signed=False) | |
argument = int.from_bytes(data[12:20], 'little', signed=True) | |
print('Command number =', command_number, '- Unique ID =', unique_id, '- argument =', argument) | |
# Build result (answer) including same command number and unique id | |
result_data = [int(2).to_bytes(4, 'little', signed=False)] | |
result_data.append(command_number.to_bytes(4, 'little', signed=False)) | |
result_data.append(unique_id.to_bytes(4, 'little', signed=False)) | |
todays_result = random.choice([(-1, 'Error'), (0, 'OK'), (1, 'Yes'), (2, 'No'), (3, 'Maybe or not')]) | |
result_data.append(todays_result[0].to_bytes(4, 'little', signed=True)) | |
result_data.append(len(todays_result[1]).to_bytes(1, 'little', signed=False)) | |
result_data.append(bytes(todays_result[1], 'ascii')) | |
# Send result back to sender | |
result_data = b''.join(result_data) | |
print('Sent', result_data, 'to', address) | |
msgsock.sendto(result_data, address) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import socket | |
import random | |
# Command line arguments | |
command_ip = sys.argv[1] | |
command_port = int(sys.argv[2]) | |
command_number = int(sys.argv[3]) | |
argument = int(sys.argv[4]) | |
# Build command data structure | |
command = [int(1).to_bytes(4, 'little', signed=False)] | |
command.append(command_number.to_bytes(4, 'little', signed=False)) | |
unique_id = random.randrange(4200000000) | |
command.append(unique_id.to_bytes(4, 'little', signed=False)) | |
command.append(argument.to_bytes(8, 'little', signed=True)) | |
# Send command | |
command = b''.join(command) | |
print('Sent', command, 'to', command_ip, 'port', command_port) | |
msgsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
msgsock.sendto(command, (command_ip, command_port)) | |
# Wait for result (answer) | |
result, address = msgsock.recvfrom(2048) | |
print('Received', result, 'from', address) | |
# Check sender port | |
if address[1] != command_port: | |
print('Packet is not sent from command port') | |
exit() | |
# Check size | |
if len(result) < 17: | |
print('Packet size is wrong for a result') | |
exit() | |
# Check packet type | |
if int.from_bytes(result[0:4], 'little', signed=False) != 2: | |
print('Packet type is not a result') | |
exit() | |
# Check command number | |
if int.from_bytes(result[4:8], 'little', signed=False) != command_number: | |
print('Command numbers are not identical') | |
exit() | |
# Check unique ID | |
if int.from_bytes(result[8:12], 'little', signed=False) != unique_id: | |
print('Unique IDs are not identical') | |
exit() | |
# Extract result data | |
result_code = int.from_bytes(result[12:16], 'little', signed=True) | |
result_str_len = result[16] | |
result_str = result[17:17+result_str_len].decode('cp1252', 'ignore') | |
print('Result code:', result_code) | |
print('Result string length:', result_str_len) | |
print('Result string:', result_str) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment