Skip to content

Instantly share code, notes, and snippets.

@wzr1337
Created December 7, 2023 14:30
Show Gist options
  • Save wzr1337/c79c486e7fd4f9a18167cd237675bc5e to your computer and use it in GitHub Desktop.
Save wzr1337/c79c486e7fd4f9a18167cd237675bc5e to your computer and use it in GitHub Desktop.
tcp tester
import socket
import time
import threading
import argparse
# Constants
BUFFER_SIZE = 1024
MAX_MESSAGE_SIZE = 1400 # 1400 MTU
NUMBER_OF_MESSAGES = 10000
TICK_MILLISECONDS = 1 # Adjust the interval as needed
# Lorem Ipsum string with 32 KB size
STRING_TO_SEND = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. " * 512 # 32 KB
def sender(ip_address, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sender_socket:
sender_socket.connect((ip_address, port))
message_counter = 0
for _ in range(NUMBER_OF_MESSAGES):
message_counter += 1
message = f"{message_counter}: {STRING_TO_SEND}"
# Check if the message is larger than 1400 bytes
if len(message) > MAX_MESSAGE_SIZE:
chunks = [message[i:i + MAX_MESSAGE_SIZE] for i in range(0, len(message), MAX_MESSAGE_SIZE)]
else:
chunks = [message]
for chunk in chunks:
sender_socket.sendall(chunk.encode())
time.sleep(TICK_MILLISECONDS / 1000) # Convert milliseconds to seconds
print("Sent", message_counter, "messages")
def receiver(ip_address, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as receiver_socket:
receiver_socket.bind((ip_address, port))
receiver_socket.listen()
print("Waiting for a connection...")
conn, addr = receiver_socket.accept()
print("Connected by", addr)
rcvd_message_counter = 0
while True:
data = conn.recv(BUFFER_SIZE)
rcvd_message_counter += 1
if not data:
break
# print("Received:", data.decode())
print("Received", rcvd_message_counter, "chunks")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TCP Sender and Receiver")
parser.add_argument("mode", choices=["sender", "receiver"], default="sender", help="Choose 'sender' or 'receiver'")
parser.add_argument("ip_address", nargs="?", default="127.0.0.1", help="IP address to bind or connect to")
parser.add_argument("port", nargs="?", type=int, default=30490, help="Port number to bind or connect to")
args = parser.parse_args()
if args.mode == "sender":
sender(args.ip_address, args.port)
elif args.mode == "receiver":
receiver(args.ip_address, args.port)
else:
print("Invalid mode. Use 'sender' or 'receiver'.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment