Skip to content

Instantly share code, notes, and snippets.

@kemingy
Last active June 17, 2024 08:49
Show Gist options
  • Save kemingy/2250877ae4acb70e8f91a0b6521bce71 to your computer and use it in GitHub Desktop.
Save kemingy/2250877ae4acb70e8f91a0b6521bce71 to your computer and use it in GitHub Desktop.
unix domain socket vs. multiprocessing transfer time
import socket
import numpy as np
import pickle
import time
import struct
matrix = np.random.random((224, 224, 3))
socket_path = './uds.sock'
def recvall(sock, n):
# Helper function to recv n bytes or return None if EOF is hit
data = bytearray()
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data.extend(packet)
return data
while True:
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(socket_path)
count = 0
t0 = time.time()
while count < 10000:
msg = pickle.dumps(matrix)
count += 1
# print('client sending {!r}'.format(msg))
# print('msg length:', len(msg))
sock.sendall(struct.pack('!i', len(msg)))
sock.sendall(msg)
length_bytes = sock.recv(4)
length = struct.unpack('!i', length_bytes)[0]
# print('expect length:', length)
recv = recvall(sock, length)
# print('recv length:', len(recv))
data = pickle.loads(recv)
print('Time: ', time.time() - t0)
print('close client socket')
break
except BrokenPipeError:
continue
import pickle
import numpy as np
from time import time
import multiprocessing as mp
from multiprocessing import shared_memory
mat = np.random.random((224, 224, 3))
epoch = 5000
def foo(x):
return x
t0 = time()
with mp.Pool() as pool:
for _ in range(epoch):
res = pool.map(foo, [mat])
print('Pool Time:', time() - t0)
def bar(queue):
while True:
data = queue.get()
queue.put(data)
queue = mp.SimpleQueue()
pro = mp.Process(target=bar, args=(queue,), daemon=True)
pro.start()
t0 = time()
for _ in range(epoch):
queue.put(mat)
res = queue.get()
print('Single Process Time:', time() - t0)
data = pickle.dumps(mat)
shm = shared_memory.SharedMemory(create=True, size=len(data))
def foobar(shm, main, other):
while True:
other.wait()
data = pickle.loads(shm.buf)
shm.buf[:] = bytearray(pickle.dumps(data))
other.clear()
main.set()
main, other = mp.Event(), mp.Event()
process = mp.Process(target=foobar, args=(shm, main, other), daemon=True)
process.start()
t0 = time()
for _ in range(epoch):
shm.buf[:] = bytearray(pickle.dumps(mat))
other.set()
main.wait()
pickle.loads(shm.buf)
main.clear()
print('SharedMemory Time', time() - t0)
shm.close()
shm.unlink()
import socket
from time import time
import numpy as np
from pyarrow import plasma
client = plasma.connect('/tmp/plasma')
socket_path = '/tmp/ipc.socket'
shape = [3, 224, 224]
data = np.random.rand(*shape)
epoch = 10000
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(socket_path)
t0 = time()
for _ in range(epoch):
oid = client.put(data.tobytes())
sock.sendall(oid.binary())
bid = sock.recv(20)
oid = plasma.ObjectID(bid)
data = client.get(oid)
data = np.frombuffer(data, dtype=np.float64)
_ = data.resize(*shape)
client.delete([oid])
print('Time:', time() - t0)
import socket
import pathlib
import numpy as np
from pyarrow import plasma
client = plasma.connect('/tmp/plasma')
socket_path = '/tmp/ipc.socket'
path = pathlib.Path(socket_path)
if path.exists():
print('delete socket file')
path.unlink()
# UDS
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.bind(socket_path)
sock.listen(1)
while True:
print('waiting for connection')
conn, client_name = sock.accept()
with conn:
print('connection from', client_name)
while True:
bid = conn.recv(20)
if not bid:
print('EOF')
break
oid = plasma.ObjectID(bid)
conn.sendall(oid.binary())
package main
import (
"log"
"net"
"os"
"io"
"encoding/binary"
)
const sockAddr = "./uds.sock"
func main() {
if _, err := os.Stat(sockAddr); err == nil {
if err := os.Remove(sockAddr); err != nil {
log.Printf("Remove socket file error: %v", err)
}
}
sock, err := net.Listen("unix", sockAddr)
if err != nil {
log.Fatal("Listen error: ", err)
}
defer sock.Close()
for {
conn, err := sock.Accept()
if err != nil {
log.Fatal("Accept error: ", err)
}
log.Println("connection accepted.")
go func(conn net.Conn) {
lengthBuf := make([]byte, 4)
for {
len, err := conn.Read(lengthBuf)
if err != nil || len != 4 {
if err == io.EOF {
log.Println("end of file")
break
}
log.Fatal("read buffer error", err, len)
}
length := binary.BigEndian.Uint32(lengthBuf)
// log.Println("length:", length)
dataBuf := make([]byte, length)
var start, count int
for {
if count, err = conn.Read(dataBuf[start:]); err != nil {
log.Fatal("read buffer error", err)
}
start += count
if count == 0 || start == int(length) {
break
}
}
_, err = conn.Write(lengthBuf)
if err != nil {
log.Fatal("Send to client error: ", err)
}
_, err = conn.Write(dataBuf)
if err != nil {
log.Fatal("Send to client error: ", err)
}
}
}(conn)
}
}
import socket
from time import time
import numpy as np
from multiprocessing import shared_memory
socket_path = '/tmp/ipc.socket'
shape = [3, 224, 224]
data = np.random.rand(*shape)
epoch = 10000
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(socket_path)
t0 = time()
for _ in range(epoch):
name = np.random.bytes(15)
shm = shared_memory.SharedMemory(
name=name.hex(), create=True, size=data.nbytes)
shm.buf[:] = data.tobytes()[:]
sock.sendall(name)
shm.close()
# another one
name = sock.recv(15)
exist = shared_memory.SharedMemory(name=name.hex())
data = np.ndarray(shape, dtype=np.float64, buffer=exist.buf)
# print(data, data.shape)
exist.unlink()
print('Time:', time() - t0)
import numpy as np
from multiprocessing import shared_memory
import socket
import pathlib
socket_path = '/tmp/ipc.socket'
path = pathlib.Path(socket_path)
if path.exists():
print('delete socket file')
path.unlink()
# UDS
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.bind(socket_path)
sock.listen(1)
while True:
print('waiting for connection')
conn, client_name = sock.accept()
with conn:
print('connection from', client_name)
while True:
bid = conn.recv(15)
if not bid:
print('EOF')
break
conn.sendall(bid)
  • CPU: Intel(R) Xeon(R) Silver 4110 CPU @ 2.10GHz
  • System: Ubuntu 16.04
  • Python: 3.7.3
  • Go: 1.14.6
Type Time Cost Epoch Data Size (f64)
Unix Domain Socket (local file) 21.60s 10000 224 * 224 * 3
Unix Domain Socket (/dev/shm) in docker container 18.03s 10000 224 * 224 * 3
Unix Domain Socket (local file) in docker container 18.38s 10000 224 * 224 * 3
Plasma (shared memory: 1,000,000,000) 9.48s 10000 224 * 224 * 3
Python 3.8.3 SharedMemory 7.81s 10000 224 * 224 * 3
Multiprocessing Pool 24.65s 2 * 5000 224 * 224 * 3
Multiprocessing Process 25.29s 2 * 5000 224 * 224 * 3
Multiprocessing SharedMemory (py3.8) 12.58s 2 * 5000 224 * 224 * 3
%time for _ in range(epoch):pickle.loads(pickle.dumps(mat)) 7.72s 10000 224 * 224 * 3
Multiprocessing Pool 9.43s 2 * 50000 300
Multiprocessing Process 3.81s 2 * 50000 300
Multiprocessing SharedMemory (py3.8) 4.34s 2 * 50000 300
Unix Domain Socket 4.69s 100000 300
%time for _ in range(epoch):pickle.loads(pickle.dumps(mat)) 1.58s 100000 300
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment