Created
October 2, 2019 20:35
-
-
Save caffeinatedgaze/211323f771cd662b12a425c6a9465f75 to your computer and use it in GitHub Desktop.
Distributed Systems Lab on Socket Communication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from socket import \ | |
socket, \ | |
AF_INET, \ | |
SOCK_STREAM, \ | |
SOL_SOCKET, \ | |
SO_REUSEADDR | |
from selectors import DefaultSelector, EVENT_READ | |
from sys import argv, stderr | |
from threading import Thread | |
from types import SimpleNamespace | |
from os.path import isfile | |
def eprint(*args, **kwargs): | |
print(file=stderr, *args, **kwargs) | |
clients = [] | |
def remove_client(name): | |
global clients | |
clients = list(filter(lambda c: c.name != name, clients)) | |
class Dispatcher(Thread): | |
def __init__(self, arguments, selector): | |
""" | |
Create a master socket on a specific interface and port, | |
if the interface is '_' then listen on all of the available | |
""" | |
super().__init__(daemon=True) | |
if len(arguments) < 3: | |
eprint('Not enough arguments\n' | |
'Usage:\n' | |
'alice interface port\n') | |
raise Exception | |
try: | |
port = int(arguments[2]) | |
except ValueError: | |
eprint('Non-int port is given') | |
raise ValueError | |
interface = arguments[1] if arguments[1] != '_' else '' | |
self.selector = selector | |
self.info = (interface, port) | |
self.sock = socket(AF_INET, SOCK_STREAM) | |
self.sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # Turn on address reuse | |
def run(self): | |
self.sock.bind(self.info) | |
self.sock.listen() | |
next_name = 1 | |
while True: | |
con, addr = self.sock.accept() | |
con.setblocking(False) | |
name = 'u' + str(next_name) | |
client = SimpleNamespace(name=name, current_file='', outb='') | |
clients.append(client) | |
self.selector.register(con, EVENT_READ, data=client) # Available only for writing | |
print(f'New CIR is accepted from {str(addr)}') | |
class Server(Thread): | |
""" | |
Handle all incoming connections | |
""" | |
def __init__(self, selector): | |
super().__init__() | |
self.selector = selector | |
@staticmethod | |
def _gen_filename(filename): | |
postfix = '' | |
i = 0 | |
while isfile(filename + postfix): | |
postfix = str(i) | |
i += 1 | |
filename += str(postfix) | |
return filename | |
@staticmethod | |
def _write_file(filename, data): | |
f = open(filename, 'ab') | |
f.write(data) | |
f.close() | |
def _read(self, sock, client): | |
data = sock.recv(1024) | |
if not client.current_file and data: | |
client.current_file = self._gen_filename(data.decode()) | |
else: | |
self._write_file(client.current_file, data) | |
if not data: | |
self._close(sock, client) | |
return | |
def _close(self, sock, client): | |
remove_client(client.name) | |
self.selector.unregister(sock) | |
sock.close() | |
print(f'{client.name} disconnected') | |
def run(self): | |
while True: | |
events = self.selector.select(timeout=None) | |
for key, mask in events: | |
sock = key.fileobj | |
client = key.data | |
if mask & EVENT_READ: | |
self._read(sock, client) | |
def main(): | |
selector = DefaultSelector() | |
Server(selector).start() | |
dispatcher = Dispatcher(argv, selector) | |
dispatcher.run() | |
dispatcher.join() | |
print('All joined') # Never goes here | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from sys import argv, stderr | |
from time import sleep | |
from os.path import getsize | |
from tqdm import tqdm | |
from socket import \ | |
socket, \ | |
AF_INET, \ | |
SOCK_STREAM | |
def eprint(*args, **argv): | |
print(file=stderr, *args, **argv) | |
def main(): | |
if len(argv) < 4: | |
eprint('Not enough arguments\n' | |
'Usage:\n' | |
'bob file rcpt_addr rcpt_port\n') | |
raise Exception | |
sock = socket(AF_INET, SOCK_STREAM) | |
filename, rcpt_addr, rcpt_port = argv[1:] | |
rcpt_port = int(rcpt_port) | |
sock.connect((rcpt_addr, rcpt_port)) | |
sock.send(filename.encode()) | |
sleep(0.001) | |
file = open(filename, 'rb') | |
size = getsize(filename) | |
progress = tqdm(total=size, unit='bytes') | |
data = True | |
while data: | |
data = file.read(1024) | |
sleep(0.001) | |
progress.update(len(data)) | |
sock.send(data) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment