Last active
November 27, 2024 15:30
-
-
Save fahadysf/9cc2a2022ef5aadaf00db35233ecd1a8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
SCTP Client/Server | |
Author: Fahad Yousuf | |
This script implements a simple SCTP client/server using Python's socket module. | |
The server listens on port 62324 and the client connects to the server on the same port. | |
""" | |
import socket | |
import sys | |
import os | |
import argparse | |
from datetime import datetime | |
PORT = 62324 | |
DEFAULT_SERVER_ADDR = '127.0.0.1' | |
def output(message): | |
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
print(f"{timestamp} - {message}") | |
def get_local_ip(remote_addr): | |
"""Retrieve local IP address used for connection to remote server.""" | |
try: | |
# Create a temporary socket to determine local IP | |
temp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
temp_sock.connect((remote_addr, 80)) | |
local_ip = temp_sock.getsockname()[0] | |
temp_sock.close() | |
return local_ip | |
except Exception as e: | |
output(f"Error getting local IP: {e}") | |
return '127.0.0.1' | |
def server(port=PORT): | |
"""SCTP Server implementation.""" | |
try: | |
# Create SCTP socket | |
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_SCTP) | |
# Bind to all interfaces | |
server_socket.bind(('0.0.0.0', port)) | |
server_socket.listen(5) | |
output(f"Server listening on port {port}") | |
while True: | |
# Accept incoming connection | |
client_socket, addr = server_socket.accept() | |
output(f"Connection from {addr}") | |
# Receive message | |
data = client_socket.recv(1024).decode() | |
output(f"Received: {data}") | |
client_socket.close() | |
except Exception as e: | |
output(f"Server error: {e}") | |
sys.exit(1) | |
def client(server_addr, port=PORT): | |
"""SCTP Client implementation.""" | |
try: | |
# Get local IP | |
local_ip = get_local_ip(server_addr) | |
# Create SCTP socket | |
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_SCTP) | |
# Connect to server | |
client_socket.connect((server_addr, port)) | |
# Prepare message with local IP | |
message = f"Hello Server, from {local_ip}" | |
# Send message | |
client_socket.send(message.encode()) | |
output(f"Sent message: {message}") | |
client_socket.close() | |
except Exception as e: | |
output(f"Client error: {e}") | |
sys.exit(1) | |
def main(): | |
# Parse command-line arguments | |
parser = argparse.ArgumentParser(description='SCTP Client/Server') | |
parser.add_argument('-m', '--mode', type=str, choices=['server', 'client'], | |
help='Select server or client mode', default='client') | |
parser.add_argument('-s','--server_addr', type=str, default=DEFAULT_SERVER_ADDR, | |
help='Server address to connect to') | |
parser.add_argument('-p', '--port', type=int, default=PORT, | |
help='Port to connect to or listen on') | |
args = parser.parse_args() | |
if args.mode == "server": | |
mode = "server" | |
else: | |
mode = "client" | |
if mode == "server": | |
server(port=args.port) | |
elif mode == "client": | |
output("Running in client mode") | |
client(args.server_addr, args.port) | |
else: | |
output("Select server or client mode") | |
sys.exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment