Created
December 8, 2017 01:47
-
-
Save 1st1/1c606e5b83ef0e9c41faf21564d75ad7 to your computer and use it in GitHub Desktop.
get_buffer_bench.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
try: | |
from time import perf_counter as clock | |
except ImportError: | |
from time import time as clock | |
import asyncio | |
import uvloop | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
class FrameDecoder: | |
""" | |
Framing layer mixin with custom buffering logic. | |
""" | |
def connection_made(self, transport): | |
self.transport = transport | |
self._msg_size = None | |
self._new_buffer(8) | |
def _new_buffer(self, size): | |
self.buffer = bytearray(size) | |
self.buffer_view = memoryview(self.buffer) | |
self.pos = 0 | |
def get_buffer(self): | |
return self.buffer_view[self.pos:] | |
def buffer_updated(self, nbytes): | |
self.pos += nbytes | |
if self._msg_size is None: | |
if self.pos == 8: | |
self._msg_size = struct.unpack('L', self.buffer)[0] | |
self._new_buffer(self._msg_size) | |
else: | |
if self.pos == self._msg_size: | |
msg = self.buffer | |
self._new_buffer(8) | |
self._msg_size = None | |
self.message_received(msg) | |
def message_received(self, msg): | |
raise NotImplementedError | |
def send_message(self, msg): | |
self.transport.write(struct.pack('L', len(msg))) | |
self.transport.write(msg) | |
class BenchServerProtocol(FrameDecoder, asyncio.Protocol): | |
def connection_lost(self, exc): | |
print('The client closed the connection:', exc) | |
def message_received(self, msg): | |
print('server', len(msg)) | |
self.send_message(msg) | |
class BenchClientProtocol(FrameDecoder, asyncio.Protocol): | |
def __init__(self): | |
self._evt_done = asyncio.Event() | |
def connection_lost(self, exc): | |
print('The server closed the connection:', exc) | |
def message_received(self, msg): | |
print('client', len(msg)) | |
self._evt_done.set() | |
async def wait_until_complete(self): | |
await self._evt_done.wait() | |
async def f(): | |
data = b"x" * (100 * 1000**2) # 100 MB | |
niters = 5 | |
loop = asyncio.get_event_loop() | |
server = await loop.create_server(BenchServerProtocol, '127.0.0.1', 8000) | |
start = clock() | |
for i in range(niters): | |
_, client = await loop.create_connection(BenchClientProtocol, | |
'127.0.0.1', 8000) | |
client.send_message(data) | |
await client.wait_until_complete() | |
end = clock() | |
server.close() | |
dt = end - start | |
rate = len(data) * niters / dt | |
print("duration: %s => rate: %d MB/s" | |
% (dt, rate / 1e6)) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(f()) | |
loop.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment