Created
May 28, 2021 12:51
-
-
Save lezwon/cdd486bdbdf00b40c2444a6e3ee21951 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
from select import select | |
from collections import deque | |
import time | |
import heapq | |
class Scheduler: | |
def __init__(self): | |
self.ready = deque() # Holds the tasks to be executed | |
self.sleeping = [] # Holds the sleeping tasks | |
self.read_waiting = {} # Callbacks to read from socket | |
self.write_waiting = {} # Callback to write to socket | |
def call_soon(self, func): | |
''' Adds the func to the ready queue immediately ''' | |
self.ready.append(func) | |
def call_later(self, sleep, func): | |
''' | |
Adds the func to the sleeping queue | |
after calcualting deadline | |
''' | |
deadline = time.time() + sleep | |
heapq.heappush(self.sleeping, (deadline, func)) | |
def read_wait(self, fileno, func): | |
''' Adds callback for reading a socket ''' | |
self.read_waiting[fileno] = func | |
def write_wait(self, fileno, func): | |
''' Adds callback for writing to a socket ''' | |
self.write_waiting[fileno] = func | |
def run(self): | |
''' Run the Event loop ''' | |
while self.ready or self.sleeping or self.read_waiting or self.write_waiting: | |
if not self.ready: | |
if self.sleeping: | |
deadline, _ = self.sleeping[0] | |
timeout = deadline - time.time() # Calculate timeout | |
if timeout < 0: | |
timeout = 0 | |
else: | |
timeout = None | |
# Use timeout in select call to check Network I/O | |
ready_read, ready_wait, _ = select(self.read_waiting, self.write_waiting, [], timeout) | |
for fd in ready_read: | |
self.ready.append(self.read_waiting.pop(fd)) | |
for fd in ready_wait: | |
self.ready.append(self.write_waiting.pop(fd)) | |
# Check Sleeping tasks | |
now = time.time() | |
while self.sleeping: | |
if self.sleeping[0][0] < now: | |
deadline, func = heapq.heappop(self.sleeping) # Pop expired sleeping func | |
self.ready.append(func) # add it to ready queue | |
else: | |
break | |
# Execute the ready tasks | |
while self.ready: | |
func = self.ready.popleft() | |
func() | |
addr = ('127.0.0.1', 3000) | |
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | |
sock.bind(addr) | |
sock.listen(1) | |
sock.setblocking(False) | |
maxbytes = 10000 | |
def accept(sock): | |
''' Accepts a connection ''' | |
client, addr = sock.accept() | |
print("Connected to ", addr) | |
sched.read_wait(client, lambda: recv(client)) | |
def recv(client): | |
''' Recives message from client ''' | |
try: | |
data = client.recv(maxbytes) | |
if not data: | |
raise ConnectionError | |
except (ConnectionError, ConnectionResetError) as e: | |
client.close() | |
print("Connection Closed") | |
sched.read_wait(sock, lambda: accept(sock)) | |
return | |
except Exception as e: | |
print(e) | |
print("Received message: ", data.decode()) | |
sched.write_wait(client, lambda: send(client, data)) | |
def send(sock, data): | |
''' Send message to client socket ''' | |
sock.send(b'Got: ' + data) | |
sched.read_wait(sock, lambda: recv(sock)) | |
def countdown(n): | |
if n > 0: | |
print('Down', n) | |
sched.call_later(4, lambda: countdown(n-1)) | |
def countup(n, i=0): | |
print('Up', i) | |
sched.call_later(1, lambda: countup(n, i+1)) | |
sched = Scheduler() | |
sched.read_wait(sock, lambda: accept(sock)) | |
sched.call_soon(lambda: countdown(6)) | |
sched.call_soon(lambda: countup(6)) | |
sched.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment