Created
July 11, 2016 13:37
-
-
Save limboinf/61b5ff2dc2a734038b65b396ba4961a0 to your computer and use it in GitHub Desktop.
下面是一个简单事件驱动实现Echo服务器。
事件驱动的核心就是都有一个循环来轮询socket的活跃性并执行响应。
事件驱动型I/O的一个潜在的优势在于它可以在不使用线程或进程的情况下同时处理成百上千个socket。
事件循环一次处理一个事件,不需要任何其他的并发原语参与。
事件驱动型I/O的缺点在于并没有涉及真正的并发。如果任何一个事件处理方法阻塞了或者
执行了一个耗时较长的计算,那么就会阻塞整个程序的执行进程。 对于阻塞型或者需要长时间运行的计算,可以通过将任务发送给单独的线程或进程来解决。
但是将线程和进程同事件循环进行协调需要较高的技巧。常常用concurrent.futures模块来实现。
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
""" | |
《Python Cookbook》11.12 理解事件驱动型I/O | |
事件驱动I/O是一种将基本的I/O操作(即:读和写)转换成事件的技术,而我们必须在程序中去处理这种事件。 | |
比如当socket上都到数据时,这成为一个"接收事件", 由我们提供的回调方法或函数去负责处理以此响应这个事件。 | |
一个事件驱动框架可能会以一个基类作为起始点,实现一系列基本的事件处理方法。 | |
:copyright: (c) 2016 by fangpeng(@beginman.cn). | |
:license: MIT, see LICENSE for more details. | |
""" | |
import select | |
import socket | |
class EventHnadler: | |
"""事件驱动基类""" | |
def fileno(self): | |
raise NotImplementedError() | |
def wants_to_receive(self): | |
"""Return True if receiving is allowed""" | |
return False | |
def handle_receive(self): | |
"""Perform the receive operation""" | |
pass | |
def wants_to_send(self): | |
"""Return True if sending is requested""" | |
return False | |
def handle_send(self): | |
"""Send outgoing data""" | |
pass | |
def event_loop(handlers): | |
while True: | |
wants_recv = [h for h in handlers if h.wants_to_receive()] | |
wants_send = [h for h in handlers if h.wants_to_send()] | |
# 事件循环的核心在于select()调用, 轮询文件描述符检查它们是否处于活跃状态 | |
can_recv, can_send, _ = select.select(wants_recv, wants_send, []) | |
for h in can_recv: | |
h.handle_receive() # 处理可读事件 | |
for h in can_send: | |
h.handle_send() # 处理可写事件 | |
class TCPServer(EventHnadler): | |
def __init__(self, address, client_handler, hander_list): | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) | |
self.sock.bind(address) | |
self.sock.listen(1) | |
self.client_handler = client_handler | |
self.hander_list = hander_list | |
def fileno(self): | |
return self.sock.fileno() | |
def wants_to_receive(self): | |
return True | |
def handle_receive(self): | |
client, addr = self.sock.accept() | |
print("received client address:", addr) | |
# 在每个连接中都会为客户端创建一个新的处理例程,并添加到事件循环处理列表中 | |
self.hander_list.append(self.client_handler(client, self.hander_list)) | |
class TCPClient(EventHnadler): | |
def __init__(self, sock, handler_list): | |
self.sock = sock | |
self.handler_list = handler_list | |
self.outgoing = bytearray() | |
def fileno(self): | |
return self.sock.fileno() | |
def close(self): | |
self.sock.close() | |
# 当连接关闭时,每个客户端都必须将它们自己从列表中移除。 | |
self.handler_list.remove(self) | |
def wants_to_send(self): | |
return True if self.outgoing else False | |
def handle_send(self): | |
nsent = self.sock.send(self.outgoing) | |
self.outgoing = self.outgoing[nsent:] | |
class TCPEchoClient(TCPClient): | |
def wants_to_receive(self): | |
return True | |
def handle_receive(self): | |
data = self.sock.recv(8192) | |
if not data: | |
self.close() | |
else: | |
self.outgoing.extend(data) | |
if __name__ == '__main__': | |
handlers = [] | |
handlers.append(TCPServer(('', 9000), TCPEchoClient, handlers)) | |
event_loop(handlers) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment