This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from socket import socket, AF_INET, SOCK_DGRAM | |
| s = socket(AF_INET, SOCK_DGRAM) | |
| s.sendto(b'', ('localhost', 20000)) | |
| print(s.recvfrom(8192)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from socketserver import BaseRequestHandler, UDPServer | |
| import time | |
| class TimeHandler(BaseRequestHandler): | |
| def handle(self): | |
| print('Got connection from', self.client_address) | |
| # Get message and client socket | |
| msg, sock = self.request | |
| resp = time.ctime() | |
| sock.sendto(resp.encode('ascii'), self.client_address) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from socket import socket, AF_INET, SOCK_DGRAM | |
| import time | |
| def time_server(address): | |
| sock = socket(AF_INET, SOCK_DGRAM) | |
| sock.bind(address) | |
| while True: | |
| msg, addr = sock.recvfrom(8192) | |
| print('Got message from', addr) | |
| resp = time.ctime() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class EventHandler: | |
| def fileno(self): | |
| 'Return the associated file descriptor' | |
| raise NotImplemented('must implement') | |
| def wants_to_receive(self): | |
| 'Return True if receiving is allowed' | |
| return False | |
| def handle_receive(self): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from socket import socket, AF_INET, SOCK_STREAM | |
| s = socket(AF_INET, SOCK_STREAM) | |
| s.connect(('localhost', 16000)) | |
| s.send(b'Hello\n') | |
| print('Got:', s.recv(8192)) | |
| s.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # TCP Example | |
| import socket | |
| from eventhandler import EventHandler, event_loop | |
| class TCPServer(EventHandler): | |
| def __init__(self, address, client_handler, handler_list): | |
| self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) | |
| self.sock.bind(address) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import socket | |
| from concurrent.futures import ThreadPoolExecutor | |
| from eventhandler import EventHandler, event_loop | |
| class ThreadPoolHandler(EventHandler): | |
| def __init__(self, nworkers): | |
| self.signal_done_sock, self.done_sock = socket.socketpair() | |
| self.pending = [] | |
| self.pool = ThreadPoolExecutor(nworkers) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from socket import * | |
| sock = socket(AF_INET, SOCK_DGRAM) | |
| for x in range(40): | |
| sock.sendto(str(x).encode('ascii'), ('localhost', 16000)) | |
| resp = sock.recvfrom(8192) | |
| print(resp[0]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from socket import * | |
| s = socket(AF_INET, SOCK_DGRAM) | |
| s.sendto(b'', ('localhost', 14000)) | |
| print(s.recvfrom(128)) | |
| s.sendto(b'Hello', ('localhost', 15000)) | |
| print(s.recvfrom(128)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import socket | |
| import time | |
| from eventhandler import EventHandler, event_loop | |
| class UDPServer(EventHandler): | |
| def __init__(self, address): | |
| self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| self.sock.bind(address) |