Created
December 7, 2023 14:30
-
-
Save wzr1337/c79c486e7fd4f9a18167cd237675bc5e to your computer and use it in GitHub Desktop.
tcp tester
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import time | |
import threading | |
import argparse | |
# Constants | |
BUFFER_SIZE = 1024 | |
MAX_MESSAGE_SIZE = 1400 # 1400 MTU | |
NUMBER_OF_MESSAGES = 10000 | |
TICK_MILLISECONDS = 1 # Adjust the interval as needed | |
# Lorem Ipsum string with 32 KB size | |
STRING_TO_SEND = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. " * 512 # 32 KB | |
def sender(ip_address, port): | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sender_socket: | |
sender_socket.connect((ip_address, port)) | |
message_counter = 0 | |
for _ in range(NUMBER_OF_MESSAGES): | |
message_counter += 1 | |
message = f"{message_counter}: {STRING_TO_SEND}" | |
# Check if the message is larger than 1400 bytes | |
if len(message) > MAX_MESSAGE_SIZE: | |
chunks = [message[i:i + MAX_MESSAGE_SIZE] for i in range(0, len(message), MAX_MESSAGE_SIZE)] | |
else: | |
chunks = [message] | |
for chunk in chunks: | |
sender_socket.sendall(chunk.encode()) | |
time.sleep(TICK_MILLISECONDS / 1000) # Convert milliseconds to seconds | |
print("Sent", message_counter, "messages") | |
def receiver(ip_address, port): | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as receiver_socket: | |
receiver_socket.bind((ip_address, port)) | |
receiver_socket.listen() | |
print("Waiting for a connection...") | |
conn, addr = receiver_socket.accept() | |
print("Connected by", addr) | |
rcvd_message_counter = 0 | |
while True: | |
data = conn.recv(BUFFER_SIZE) | |
rcvd_message_counter += 1 | |
if not data: | |
break | |
# print("Received:", data.decode()) | |
print("Received", rcvd_message_counter, "chunks") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="TCP Sender and Receiver") | |
parser.add_argument("mode", choices=["sender", "receiver"], default="sender", help="Choose 'sender' or 'receiver'") | |
parser.add_argument("ip_address", nargs="?", default="127.0.0.1", help="IP address to bind or connect to") | |
parser.add_argument("port", nargs="?", type=int, default=30490, help="Port number to bind or connect to") | |
args = parser.parse_args() | |
if args.mode == "sender": | |
sender(args.ip_address, args.port) | |
elif args.mode == "receiver": | |
receiver(args.ip_address, args.port) | |
else: | |
print("Invalid mode. Use 'sender' or 'receiver'.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment