Skip to content

Instantly share code, notes, and snippets.

@standarddeviant
Created February 25, 2020 22:04
Show Gist options
  • Save standarddeviant/52ea3d5a4ef558d41b4d72be224ce161 to your computer and use it in GitHub Desktop.
Save standarddeviant/52ea3d5a4ef558d41b4d72be224ce161 to your computer and use it in GitHub Desktop.
import io
import numpy as np
from numpy.random import randint, standard_normal
dtypes = ('float32', 'float64', 'int64', 'int32', 'int16')
def make_random_array():
s = randint(1, 5, size=randint(1, 5))
d = dtypes[randint(0, len(dtypes))]
a = None
if d.startswith('float'):
a = standard_normal(s).astype(d)
elif d.startswith('int'):
a = randint(0, high=2**15, size=s, dtype=d)
return a, s, d
def pack_array_to_bytes(a):
fake_file_write = io.BytesIO()
np.save(fake_file_write, a, allow_pickle=True, fix_imports=True)
byte_arr = fake_file_write.getvalue()
return byte_arr
def unpack_bytes_to_array(b):
fake_file_read = io.BytesIO(b)
a = np.load(fake_file_read, allow_pickle=True, fix_imports=True)
return a
def describe_array(a, b=None):
print("dtype = {}".format(str(a.dtype)))
print("shape = {}".format(str(a.shape)))
if b:
print("byte_arr length = {}".format(len(b)))
def test_array_equality(a1, a2):
# test shape and elements with np.array_equal
if a1.shape == a2.shape:
print('YAY! : shape + elements equal')
else:
print('OH NO!!!!!! : shape + elements ***********NOT*********** EQUAL')
# test if dtype is equal
if a1.dtype == a2.dtype:
print('YAY! : dtype equal')
else:
print('OH NO!!!!!! : dtype ***********NOT*********** EQUAL')
print()
#!/usr/bin/env python
import asyncio
import websockets
from websockets.exceptions import ConnectionClosedOK
# test_client.py
from numpy_byte_pack_utils import unpack_bytes_to_array, describe_array
async def recv_arrays():
uri = "ws://localhost:8765"
ix = 0
async with websockets.connect(uri) as websocket:
while True:
try:
b = await websocket.recv()
a = unpack_bytes_to_array(b)
print("Test array {}".format(ix+1))
describe_array(a, b)
ix += 1
except ConnectionClosedOK:
break
asyncio.get_event_loop().run_until_complete(recv_arrays())
#!/usr/bin/env python
import asyncio
import websockets
# test_server.py
from numpy_byte_pack_utils import make_random_array, pack_array_to_bytes, \
describe_array
async def recv_arrays(websocket, path):
for ix in range(10):
a, s, d = make_random_array()
b = pack_array_to_bytes(a)
print("Test array {}".format(ix+1))
describe_array(a, b)
await websocket.send(b)
await asyncio.sleep(1)
await websocket.close()
start_server = websockets.serve(recv_arrays, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment