Created
February 25, 2020 22:04
-
-
Save standarddeviant/52ea3d5a4ef558d41b4d72be224ce161 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import numpy as np | |
from numpy.random import randint, standard_normal | |
dtypes = ('float32', 'float64', 'int64', 'int32', 'int16') | |
def make_random_array(): | |
s = randint(1, 5, size=randint(1, 5)) | |
d = dtypes[randint(0, len(dtypes))] | |
a = None | |
if d.startswith('float'): | |
a = standard_normal(s).astype(d) | |
elif d.startswith('int'): | |
a = randint(0, high=2**15, size=s, dtype=d) | |
return a, s, d | |
def pack_array_to_bytes(a): | |
fake_file_write = io.BytesIO() | |
np.save(fake_file_write, a, allow_pickle=True, fix_imports=True) | |
byte_arr = fake_file_write.getvalue() | |
return byte_arr | |
def unpack_bytes_to_array(b): | |
fake_file_read = io.BytesIO(b) | |
a = np.load(fake_file_read, allow_pickle=True, fix_imports=True) | |
return a | |
def describe_array(a, b=None): | |
print("dtype = {}".format(str(a.dtype))) | |
print("shape = {}".format(str(a.shape))) | |
if b: | |
print("byte_arr length = {}".format(len(b))) | |
def test_array_equality(a1, a2): | |
# test shape and elements with np.array_equal | |
if a1.shape == a2.shape: | |
print('YAY! : shape + elements equal') | |
else: | |
print('OH NO!!!!!! : shape + elements ***********NOT*********** EQUAL') | |
# test if dtype is equal | |
if a1.dtype == a2.dtype: | |
print('YAY! : dtype equal') | |
else: | |
print('OH NO!!!!!! : dtype ***********NOT*********** EQUAL') | |
print() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import asyncio | |
import websockets | |
from websockets.exceptions import ConnectionClosedOK | |
# test_client.py | |
from numpy_byte_pack_utils import unpack_bytes_to_array, describe_array | |
async def recv_arrays(): | |
uri = "ws://localhost:8765" | |
ix = 0 | |
async with websockets.connect(uri) as websocket: | |
while True: | |
try: | |
b = await websocket.recv() | |
a = unpack_bytes_to_array(b) | |
print("Test array {}".format(ix+1)) | |
describe_array(a, b) | |
ix += 1 | |
except ConnectionClosedOK: | |
break | |
asyncio.get_event_loop().run_until_complete(recv_arrays()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import asyncio | |
import websockets | |
# test_server.py | |
from numpy_byte_pack_utils import make_random_array, pack_array_to_bytes, \ | |
describe_array | |
async def recv_arrays(websocket, path): | |
for ix in range(10): | |
a, s, d = make_random_array() | |
b = pack_array_to_bytes(a) | |
print("Test array {}".format(ix+1)) | |
describe_array(a, b) | |
await websocket.send(b) | |
await asyncio.sleep(1) | |
await websocket.close() | |
start_server = websockets.serve(recv_arrays, "localhost", 8765) | |
asyncio.get_event_loop().run_until_complete(start_server) | |
asyncio.get_event_loop().run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment