Created
October 20, 2019 00:00
-
-
Save aferral/e8b90289bf0a5cb102ea79077fc0025c to your computer and use it in GitHub Desktop.
Send and receive small numpy images using unix domain socket with socker.SOCK_DGRAM (atomic messages)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import numpy as np | |
import random | |
import struct | |
import socket | |
import sys | |
import os | |
import sys | |
from collections import namedtuple | |
""" | |
Send and receive small numpy images using unix domain socket with socker.SOCK_DGRAM (atomic messages) | |
Note: It cannot send large images because there is a limit in msg size. | |
usage: | |
# create a received | |
python test_socket.py 1 | |
# create a sender | |
python test_socket.py 0 | |
""" | |
keys=['timestamp','value','test','img_h','img_w','img_c','img_data'] | |
Msg = namedtuple('Msg', keys) | |
argv = sys.argv | |
modo = argv[1] | |
server_address = './uds_socket' | |
img_shape = (50,50,3) | |
img_dtype = np.float64 | |
def get_header(img_shape,numpy_dtype): | |
assert(len(img_shape) == 3) | |
n_elems = img_shape[0]*img_shape[1]*img_shape[2] | |
elem_size = np.dtype(numpy_dtype).itemsize | |
data_string = "{0}c".format(elem_size * n_elems) | |
header_string = "fffiii" | |
size_header = struct.calcsize(header_string) | |
size_data = struct.calcsize(data_string) | |
return keys,(header_string,size_header),(data_string,size_data) | |
def load_msg(data_raw,img_shape,img_dtype): | |
keys, header_t,data_t = get_header(img_shape, img_dtype) | |
header_format,h_size = header_t | |
data_format,d_size = data_t | |
data_header = data_raw[0:h_size] | |
data_img = data_raw[h_size:] | |
unpack_header=struct.unpack(header_format, data_header) | |
h,w,c=img_shape | |
img = np.frombuffer(data_img,dtype=img_dtype).reshape(h,w,c) | |
all_decode_vals = list(unpack_header)+[img] | |
return Msg._make(all_decode_vals) | |
def format_msg(img): | |
assert(len(img.shape) == 3) | |
keys, header_t,data_t = get_header(img.shape, img.dtype) | |
bytes_strings = header_t[0] | |
timestamp = time.time() | |
value = random.random() | |
test = 32 | |
img_raw_data = img.data.tobytes() | |
w,h,c = img.shape | |
vals = [timestamp, value, test, w, h, c] | |
header_data=struct.pack(bytes_strings, *vals) | |
return header_data + img_raw_data | |
# lector | |
if int(modo) == 0: | |
try: | |
os.unlink(server_address) | |
except OSError: | |
if os.path.exists(server_address): | |
raise | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) | |
print('starting up on %s' % server_address) | |
sock.bind(server_address) | |
_,h_t,d_t=get_header(img_shape,img_dtype) | |
size=h_t[1]+d_t[1] | |
while True: | |
print('waiting for a connection') | |
try: | |
while True: | |
data = sock.recv(size) | |
print("received {0} bytes".format(len(data))) | |
msg = load_msg(data,img_shape,img_dtype) | |
finally: | |
sock.close() | |
elif int(modo) == 1: # sender | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) | |
print('connecting to %s' % server_address) | |
try: | |
sock.connect(server_address) | |
except socket.error as msg: | |
print(sys.stderr, msg) | |
sys.exit(1) | |
try: | |
for i in range(10): | |
img = np.ones(img_shape,dtype=img_dtype) | |
data_to_send=format_msg(img) | |
print('sending {0} bytes'.format(len(data_to_send))) | |
sock.sendall(data_to_send) | |
finally: | |
print('closing socket') | |
sock.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment