Skip to content

Instantly share code, notes, and snippets.

@erikson1970
Last active November 12, 2024 22:16
Show Gist options
  • Save erikson1970/fba13fe1b337ed5b623341d84afc510f to your computer and use it in GitHub Desktop.
Save erikson1970/fba13fe1b337ed5b623341d84afc510f to your computer and use it in GitHub Desktop.
send and receive udp packets in pure python.
# /bin/python
# -*- coding: utf-8 -*-
# send_random_udp.py
# send and receive random data over udp for testing
# usage: send_random_udp.py [-h] {send,recv} ...
# positional arguments:
# {send,recv} send or receive data
# optional arguments:
# -h, --help show this help message and exit
#
# MIT License
#
# Copyright (c) 2024 @erikson1970 on Github
import socket
import argparse
from time import sleep
from datetime import datetime
import sys
import re
import struct
import json
from random import seed
from zlib import decompress, compress
wordsStore = []
try:
from essential_generators import DocumentGenerator
docGenSource = "`essential_generators` Sentence Generator"
# try:
# raise ImportError
except ImportError:
from random import randint
from base64 import b64decode
docGenSource = "Internal Sentence Generator"
class DocumentGenerator:
wordsStore = []
def __init__(self):
self.wordsStore = [
ii.split(" ")
for ii in decompress(
b64decode(
"""eJw9VMuS3DYMvOsr8Cuz6y07W2PHFVflkBtEYSR6SIIGyXnk69OQ1qk5aFRoNYBGAyc2LfSWMxOnmbtpa8RdaOaC33SaJdGfKd6i
A1JcuatRUL3KQnPicJ3F7Amc8caZTjfHVQ5Ms8a0g4Z8YBYA/mgMygRU4aBlYbpYdJgJLxcbsQNnkQv90Lp51tJRjKcL+MtJR5Xp
lOTBZRGjrwdEULNRFfGUYTvylcXkTq8bW9KOnhyXtKI5HevWgYtoCbi+aXnSKUtyNsu8IKDUNIHMuHzUb05MbzdJz4KalhVvYYBn
GEAo3Po2jE4zZIrpf8SmtaKqxXjV8tFic6YvbBWPWfiGB+LlSpe4TqfRevSpxPSkOTZMqMWdY1Vtv9V8YDJvKf4LPfvmJJ5K7o4y
hkYvUn5yBs8J7OAZlwt7U12THJCjlpcEdaH2Bb0H7tQyozqwDL7x9GK8JHkiE2YWTO/UN1cVWmlB1Cf1FXp5kQubT2E1HWWha7zH
I8MrJ5npR8AcpHdadKXOrSPHdeRfg3fEDZX+HQPshTqEVxTZKspJkpHolfMiaGUPJqnbbgvxeMziYXEjfzYOQhd90CUxRl6QIz0h
CyCbRfReXfjXLakcYkIXa50y3MQWi0yfuEQoC8aIjuGdHeJEwKyK+C0u9B3Uu5VQHjReqUF/YAq0i9OnZ4Isf0UXLkUf3x3Loz6b
IqEfiTC7n7zRmQFGkeUBBxxOqVz5ydNbR5d0do/imV1c2M6wekC0Fn+b6TPP5iV/U2PSig0emT6GWIXDNn2RAgf8oyjHF8GoA3RE
bcJGcsAeoYoaV1qwBDDMHrUWs6v/jj2n08COPqnCPm6TeYegEa41iUN0hqNLQU9VLQyPwQvwSk0jA5CleTdPMnwOYW+ye6BqFtix
YIOnd90w5OWwExqQ6rfGd/rXiCU4oGH4cKNwwSFq14idCc4F1laP1Xgfu2LfGOciwkb41gwCQbv9JDVM4JDuHHGSzqMwQZXiW9H3
uPH9oDrr6qcIm7A3Bnd2jc37wlgA7Yy5+LGYziNwoxfDafTz4APnGWuXBHcKQM04ndN3cfnPAq47kEEhbI4PAO5+wfat+g8M6Ats"""
)
)
.decode("utf-8")
.split("\n")
]
def sentence(self):
results = "{} the {} {} {}.".format(
" and ".join(
[
self.wordsStore[randint(0, 39)][randint(0, 1)]
for ii in range(randint(1, 2))
]
),
self.wordsStore[randint(0, 39)][2],
self.wordsStore[randint(0, 39)][3],
" and ".join(
[
"a " + self.wordsStore[randint(0, 39)][4]
for ii in range(randint(1, 2))
]
),
)
return results
try:
import argcomplete
except ImportError:
argcomplete = None
def _version():
return "0.1"
timefmt = "%j:%H:%M:%S.%f "
def is_multicast_address(ip_address):
"""Returns True if ip_address is in multicast range"""
multicast_regex = (
r"^22[4-9]\.\d{1,3}\.\d{1,3}\.\d{1,3}$|^23[0-9]\.\d{1,3}\.\d{1,3}\.\d{1,3}$"
)
return re.match(multicast_regex, ip_address) is not None
def is_administratively_scoped_multicast(ip_address):
"""Returns True if ip_address is in local network multicast range"""
admin_scope_regex = r"^239\.\d{1,3}\.\d{1,3}\.\d{1,3}$"
return re.match(admin_scope_regex, ip_address) is not None
def create_parser(description):
"""creates a parser to get command line arguments"""
p = argparse.ArgumentParser(description=description)
subparsers = p.add_subparsers()
sender = subparsers.add_parser("send", help="send text data over udp ")
p.set_defaults(command="empty")
sender.add_argument(
"-d", "--dest", default="127.0.0.1", help="destination address "
)
sender.add_argument("-m", "--message", default=None, help="fixed Message to send ")
sender.add_argument(
"-i",
"--input",
default="*random*",
help='input file or source if any (use "-" for stdin).'
+ " [default is internally generated random sentence] ",
)
sender.add_argument(
"-e", "--seed", type=int, default=-1, help="Set the random Seed [-1]"
)
sender.add_argument(
"-n",
"--null",
action="store_true",
default=False,
help="Null Terminate the UDP stream",
)
sender.add_argument(
"-l",
"--length",
type=int,
default=-1,
help="max byte length to send [-1 : none]",
)
sender.add_argument(
"-s",
"--sleep",
type=float,
default=1.0,
help="Average sleep time between transmissions [1.0 sec]",
)
sender.set_defaults(command="send")
receiver = subparsers.add_parser("recv", help="receive text data over udp ")
receiver.add_argument("-o", "--output", help='output file (use "-" for stdout).')
receiver.add_argument(
"-b", "--bind", default="0.0.0.0", help="bind address [0.0.0.0]"
)
receiver.add_argument(
"-x",
"--hex",
default=False,
action="store_true",
help="pretty hex output",
)
receiver.add_argument(
"-j",
"--json",
default=False,
action="store_true",
help="parse received packets as json",
)
receiver.set_defaults(command="recv")
if argcomplete:
argcomplete.autocomplete(p)
for sub in subparsers.choices.values():
sub.add_argument(
"-c", "--count", type=int, default=3, help="number to send/receive [3]"
)
sub.add_argument(
"-p",
"--port",
type=int,
default=4321,
help=" destination/receiving port [4321]",
)
sub.add_argument(
"-g",
"--group",
default="0.0.0.0",
help="multicast group [0.0.0.0 = unicast]",
)
sub.add_argument(
"-t",
"--time",
action="store_true",
default=False,
help="time tags added to output [False]",
)
sub.add_argument(
"-z",
"--zlib",
default=False,
action="store_true",
help="Use zlib to compress/decompress data.",
)
g = sub.add_mutually_exclusive_group()
g.add_argument("-v", "--verbose", default=0, action="count")
g.add_argument("-q", "--quiet", default=False, action="store_true")
return p
def getter(count, filename="*random*", message=None):
"""returns a factory for the messages to be sent out"""
quote = '"'
if filename != "*random*":
# file based thingy
if filename == "-":
try:
for line in iter(sys.stdin.readline):
yield line
except KeyboardInterrupt:
pass
finally:
sys.stdout.flush()
else:
with open(filename, "ra") as f:
yield f.readline()
else:
if message is None:
gen = DocumentGenerator()
getMessage = gen.sentence
elif message[:5] == "JSON:":
create_dict = lambda: dict(
[
(
(f"time-{ii}", f"{datetime.now().timestamp()} ")
if kk == "t"
else (
(f"gen-{ii}", gen.sentence())
if kk == "g"
else (
(f"count-{ii}", count)
if kk == "c"
else (f"msg-{ii}", kk)
)
)
)
for ii, kk in enumerate(message.split(":")[1:])
]
)
gen = DocumentGenerator()
getMessage = lambda: json.dumps(create_dict())
elif message[:5] == "SUBS:":
gen = DocumentGenerator()
getMessage = lambda: (
message[5:]
.replace("%t", str(datetime.now().timestamp()))
.replace("%g", f'"{gen.sentence()}"')
.replace("%c", str(count))
.replace("%r", str(randint(0, 65535)))
)
else:
getMessage = lambda: message
while count != 0:
count -= 1
yield getMessage()
def splitter(inputstr, n=-1):
"""breaks up messages into multiple messages of a certain max length"""
if n < 1:
yield inputstr
else:
for ii in range(0, len(inputstr), n):
yield inputstr[ii : ii + n]
def _main():
print(f"Using {docGenSource}")
fmt = "UDP Sender v{0:s}: "
description = fmt.format(_version())
interface = None
def interface_factory():
return interface
pphex = (
lambda data: "0x0000 "
+ "".join(
"{}{}{}".format(
"{0:#0{1}x}".format(ord(xxx) if type(xxx) == str else xxx, 4)[2:],
" " if (1 + i) % 8 == 0 else ":",
(
"\t{} {}\n{} ".format(
data[i - 15 : i - 7],
data[i - 7 : i + 1],
"{0:#0{1}x}".format(i, 6),
)
if (1 + i) % 16 == 0
else ""
),
)
for i, xxx in enumerate(data)
)
+ (" " if len(data) % 16 < 8 else "").join(
(
lambda ii: [
":".join([" "] * ((8 - ii) % 8 if ii < 8 else 0)),
":".join(
[" "] * (0 if ii == 0 else (8 - ii) % 8 if ii > 8 else 8)
),
]
)(len(data) % 16)
)
+ (
"\t{} {}".format(
data[-(len(data) % 16) :][:8], data[-(len(data) % 16) :][8:16]
)
if (len(data) % 16) != 0
else ""
)
)
p = create_parser(description)
args = p.parse_args()
if args.command == "empty":
p.print_help()
exit(0)
elif args.command == "send":
print(f"Starting in Send Mode")
else:
print(f"Starting in Receive Mode")
if args.verbose > 2:
print(f"command line args = {vars(args)}")
UDP_PORT = args.port
# new way
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
if args.group != "0.0.0.0":
print(f"setting up for multicast group {args.group}:{UDP_PORT} ...", end="")
UDP_IP = args.group
if is_multicast_address(UDP_IP):
if is_administratively_scoped_multicast(UDP_IP):
print(f"setting up for multicast group {UDP_IP}:{UDP_PORT}")
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 8)
# Join the multicast group
mreq = struct.pack("4sl", socket.inet_aton(UDP_IP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
print(f"OK")
else:
print(f"ERROR: Local Multicast only ==> '{UDP_IP}' is not 239.x.x.x")
exit(0)
else:
print(f"ERROR: bad Multicast group ==> '{UDP_IP}' is not 239.x.x.x")
exit(0)
else:
UDP_IP = args.dest if args.command == "send" else "0.0.0.0"
if args.command == "send":
if args.seed != -1:
print(f"Setting Random Seed to {args.seed}")
seed(args.seed)
print("UDP target IP: %s" % UDP_IP)
print("UDP target port: %s" % UDP_PORT)
try:
jj = 0
for i, MESSAGE in enumerate(getter(args.count, args.input, args.message)):
if args.time:
MESSAGE = f"[{datetime.now().timestamp()},{MESSAGE}]"
for cnt, submsg in enumerate(splitter(MESSAGE, args.length)):
jj += 1
fullDataLen = len(submsg)
msgOut = (
compress(bytearray(submsg, "utf8"))
if args.zlib
else bytearray(submsg, "utf8")
)
sendDataLen = len(msgOut)
sock.sendto(msgOut, (UDP_IP, UDP_PORT))
if args.verbose > 0:
print(
"".join(
[
f"{cnt}/{i}/{args.count} {sendDataLen:d}/{fullDataLen:d} bytes out ",
(
f"{datetime.now().timestamp()} "
if args.time
else ""
),
f"'{msgOut}'" if args.verbose > 1 else "",
]
)
)
else:
print(
f"{cnt}/{i}/{args.count}", end=" " if jj % 5 != 0 else "\n"
)
sleep(args.sleep)
except KeyboardInterrupt:
pass
finally:
if args.null:
sock.sendto(bytearray([0]), (UDP_IP, UDP_PORT))
elif args.command == "recv":
# UDP_IP = args.bind
sock.bind((args.bind, UDP_PORT))
cnt = args.count
while cnt != 0:
data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
recvDataLen = len(data)
if args.zlib:
data = decompress(data)
fullDataLen = len(data)
else:
fullDataLen = recvDataLen
try:
print(
"".join(
[
"{}".format(abs(cnt)) if args.verbose > 0 else "",
(
"/{} ".format(args.count)
if args.count > 0 and args.verbose
else " "
),
(
f"{recvDataLen:d}/{fullDataLen:d} bytes in "
if args.verbose
else " "
),
f"{datetime.now().timestamp()} " if args.time else "",
(
"\n{}".format(pphex(data))
if args.hex
else (
f"JSON:{json.loads(data)}"
if args.json
else str(data)
)
),
]
)
)
except Exception as e:
print(
"".join(
[
"BAD PACKET\n",
f"{str(e)}\n",
"{}".format(abs(cnt)) if args.verbose > 0 else "",
(
"/{} ".format(args.count)
if args.count > 0 and args.verbose
else " "
),
(
f"{recvDataLen:d}/{fullDataLen:d} bytes in "
if args.verbose
else " "
),
f"{datetime.now().timestamp()} " if args.time else "",
"\n{}".format(pphex(data)),
]
)
)
cnt -= 1
# tidy up
sock.close()
if __name__ == "__main__":
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment