Last active
December 2, 2020 15:40
-
-
Save cmpute/eaee40db68233612b8493f3b7985b67f to your computer and use it in GitHub Desktop.
Simplest IPC service for large data in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import msgpack | |
import msgpack_numpy | |
msgpack_numpy.patch() | |
import socket, time, os | |
from socket_helpers import recv_msg, send_msg | |
sout = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM) | |
sout.connect("/tmp/ros-ipc-data.sock") | |
sin = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM) | |
sin.bind("/tmp/ros-ipc-result.sock") | |
try: | |
while True: | |
arr = np.random.rand(10000, 4) | |
arr = msgpack.packb({b"data":arr}) | |
print("Send Time:", time.time()) | |
send_msg(sout, arr) | |
result = recv_msg(sin) | |
print("Mean Result:", msgpack.unpackb(result)[b'mean']) | |
time.sleep(1) | |
finally: | |
sin.close() | |
sout.close() | |
os.remove("/tmp/ros-ipc-result.sock") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import msgpack | |
import msgpack_numpy | |
msgpack_numpy.patch() | |
import socket, time, os | |
from socket_helpers import recv_msg, send_msg | |
sin = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM) | |
sin.bind("/tmp/ros-ipc-data.sock") | |
sout = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM) | |
sout_connected = False | |
try: | |
while True: | |
arr = recv_msg(sin) | |
print("Recv Time:", time.time()) | |
arr = msgpack.unpackb(arr)[b'data'] | |
if not sout_connected: | |
sout.connect("/tmp/ros-ipc-result.sock") | |
send_msg(sout, msgpack.packb({b'mean': np.mean(arr)})) | |
finally: | |
sin.close() | |
sout.close() | |
os.remove("/tmp/ros-ipc-data.sock") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import numpy as np | |
import msgpack | |
import msgpack_numpy | |
msgpack_numpy.patch() | |
import socket, time, os | |
from socket_helpers_async import recv_msg, send_msg | |
sin = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM) | |
sin.bind("/tmp/ros-ipc-data.sock") | |
sout = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM) | |
sout_connected = False | |
async def handle_data(): | |
try: | |
while True: | |
arr = await recv_msg(loop, sin) | |
print("Recv Time:", time.time()) | |
arr = msgpack.unpackb(arr)[b'data'] | |
if not sout_connected: | |
sout.connect("/tmp/ros-ipc-result.sock") | |
await send_msg(loop, sout, msgpack.packb({b'mean': np.mean(arr)})) | |
finally: | |
sin.close() | |
sout.close() | |
os.remove("/tmp/ros-ipc-data.sock") | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(handle_data()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Helper Functions for sending large data through sockets | |
# https://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data | |
import struct | |
PACK_SIZE = 4096 | |
def send_msg(sock, msg): | |
# Prefix each message with a 4-byte length (network byte order) | |
dlen = len(msg) | |
dptr = 0 | |
sock.send(struct.pack('>I', dlen)) | |
while dptr < dlen: | |
doff = dptr + PACK_SIZE | |
sock.sendall(msg[dptr:doff]) | |
dptr = doff | |
def recvall(sock, n): | |
# Helper function to recv n bytes or return None if EOF is hit | |
data = bytearray() | |
while len(data) < n: | |
packet = sock.recv(n - len(data)) | |
if not packet: | |
return None | |
data.extend(packet) | |
return data | |
def recv_msg(sock): | |
raw_msglen = recvall(sock, 4) | |
if not raw_msglen: | |
return None | |
msglen = struct.unpack('>I', raw_msglen)[0] | |
# Read the message data | |
return recvall(sock, msglen) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Helper Functions for sending large data through sockets | |
# https://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data | |
import struct | |
PACK_SIZE = 4096 | |
async def send_msg(loop, sock, msg): | |
# Prefix each message with a 4-byte length (network byte order) | |
dlen = len(msg) | |
dptr = 0 | |
sock.send(struct.pack('>I', dlen)) | |
while dptr < dlen: | |
doff = dptr + PACK_SIZE | |
await loop.sock_sendall(sock, msg[dptr:doff]) | |
dptr = doff | |
async def recvall(loop, sock, n): | |
# Helper function to recv n bytes or return None if EOF is hit | |
data = bytearray() | |
while len(data) < n: | |
packet = await loop.sock_recv(sock, n - len(data)) | |
if not packet: | |
return None | |
data.extend(packet) | |
return data | |
async def recv_msg(loop, sock): | |
raw_msglen = await recvall(loop, sock, 4) | |
if not raw_msglen: | |
return None | |
msglen = struct.unpack('>I', raw_msglen)[0] | |
# Read the message data | |
return await recvall(loop, sock, msglen) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment