Last active
May 27, 2024 17:03
-
-
Save NicolasT/4519146 to your computer and use it in GitHub Desktop.
Using the 'splice' syscall from Python, in this demonstration to transfer the output of some process to a client through a socket, using zero-copy transfers. See 'splice.py'. Usage: 'python splice.py' in one console, then e.g. 'nc localhost 9009' in another. 'nonblocking.py' is a demonstration of using 'splice' with non-blocking IO.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Demonstration of using `splice` with non-blocking IO | |
Lots of code is similar to 'splice.py', take a look at that module for more | |
documentation. | |
''' | |
import os | |
import os.path | |
import errno | |
import fcntl | |
import socket | |
import select | |
import subprocess | |
import splice | |
def set_nonblock(fd): #pylint: disable-msg=C0103 | |
'''Set a file descriptor in non-blocking mode''' | |
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) | |
flags |= os.O_NONBLOCK | |
fcntl.fcntl(fd, fcntl.F_SETFL, flags) | |
def main(host, port, path): #pylint: disable-msg=R0914 | |
'''Server implementation''' | |
# Set up server socket | |
# ==================== | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind((host, port)) | |
sock.listen(1) | |
# Wait for client | |
# =============== | |
conn, addr = sock.accept() | |
print 'Connection from:', addr | |
# Launch subprocess | |
argv = ['python', 'slowcat.py', path] | |
proc = subprocess.Popen(argv, close_fds=True, stdout=subprocess.PIPE) | |
# Set up source and sink FDs | |
# ========================== | |
pipe_fd = proc.stdout.fileno() | |
set_nonblock(pipe_fd) | |
conn_fd = conn.fileno() #pylint: disable-msg=E1101 | |
set_nonblock(conn_fd) | |
print 'Will splice data from FD', pipe_fd, 'to', conn_fd | |
# Blah blah | |
# ========= | |
transferred = 0 | |
chunksize = 32 * 1024 * 1024 | |
flags = \ | |
splice.SPLICE_F_MOVE | splice.SPLICE_F_MORE | splice.SPLICE_F_NONBLOCK | |
# Run transfer | |
# ============ | |
# The whole read/write-set and select code below is extremely bare-bone, | |
# this is not how you should implement a 'serious' event-loop. | |
# You shouldn't implement your own event-loop anyway most likely, there are | |
# tons of good ones (using different approaches) out there. | |
read_set = [pipe_fd] | |
write_set = [conn_fd] | |
while True: | |
# Wait until (most likely) the subprocess pipe is readable, and the | |
# output socket is writable. | |
readable_set, writable_set, _ = select.select(read_set, write_set, []) | |
# This is terrible. Don't do this. Seriously. | |
if pipe_fd in readable_set: | |
read_set = [] | |
if conn_fd in writable_set: | |
write_set = [] | |
if read_set or write_set: | |
# At least one of the FDs we need isn't ready | |
continue | |
# Jay, both file descriptors might be usable! | |
# Reset for the next iteration... | |
read_set = [pipe_fd] | |
write_set = [conn_fd] | |
try: | |
# Splice! | |
done = splice.splice(pipe_fd, None, conn_fd, None, chunksize, flags) | |
except IOError, exc: | |
if exc.errno in [errno.EAGAIN, errno.EWOULDBLOCK]: | |
# Oops, looks like one of the FDs blocks again. Retry! | |
continue | |
raise | |
if done == 0: | |
break | |
transferred += done | |
print 'Bytes transferred:', transferred | |
# Clean up | |
# ======== | |
conn.close() | |
sock.close() | |
proc.wait() | |
if __name__ == '__main__': | |
main('', 9009, os.path.abspath(__file__)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
A lazy cat which sleeps in-between lines | |
''' | |
import sys | |
import time | |
SLEEP_TIME = 0.05 | |
def main(fd): #pylint: disable-msg=W0621,C0103 | |
'''Main, what else''' | |
for line in fd: | |
print line, | |
sys.stdout.flush() | |
time.sleep(SLEEP_TIME) | |
if __name__ == '__main__': | |
with open(sys.argv[1], 'r') as fd: | |
main(fd) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Demonstration of using splice from Python | |
This code starts a TCP/IP server, waits for a connection, and once a connection | |
has been made, launches a subprocess ('cat' of this file). Then, it transfers | |
everything this subprocess outputs on stdout to the socket client. When no more | |
data is available, everything is shut down. | |
The server is fully blocking etc. etc. etc. even though splice(2) supports | |
non-blocking execution. You should set any pipes to non-blocking mode (using | |
fcntl or whatever) and call splice with the `SPLICE_F_NONBLOCK` flag set, then | |
integrate FD read/write'ability with your mainloop and select/poll/epoll/... | |
calls. This is very application/framework/library-specific, so I don't bother | |
with it in this code. Notice you might need to wrap calls to splice in an | |
exception handler to catch EWOULDBLOCK, EAGAIN,... The lot. | |
Bindings to splice(2) are made using ctypes. | |
This code is public domain as fully as possible in any applicable law, etc. etc. | |
etc. | |
It comes without warranty blah blah blah do whatever you want with it but don't | |
blame me if anything breaks. | |
If you find any errors, please let me know! | |
''' | |
import os | |
import os.path | |
import errno | |
import socket | |
import subprocess | |
import ctypes | |
import ctypes.util | |
def make_splice(): | |
'''Set up a splice(2) wrapper''' | |
# Load libc | |
libc_name = ctypes.util.find_library('c') | |
libc = ctypes.CDLL(libc_name, use_errno=True) | |
# Get a handle to the 'splice' call | |
c_splice = libc.splice | |
# These should match for x86_64, might need some tweaking for other | |
# platforms... | |
c_loff_t = ctypes.c_uint64 | |
c_loff_t_p = ctypes.POINTER(c_loff_t) | |
# ssize_t splice(int fd_in, loff_t *off_in, int fd_out, | |
# loff_t *off_out, size_t len, unsigned int flags) | |
c_splice.argtypes = [ | |
ctypes.c_int, c_loff_t_p, | |
ctypes.c_int, c_loff_t_p, | |
ctypes.c_size_t, | |
ctypes.c_uint | |
] | |
c_splice.restype = ctypes.c_ssize_t | |
# Clean-up closure names. Yup, useless nit-picking. | |
del libc | |
del libc_name | |
del c_loff_t_p | |
# pylint: disable-msg=W0621,R0913 | |
def splice(fd_in, off_in, fd_out, off_out, len_, flags): | |
'''Wrapper for splice(2) | |
See the syscall documentation ('man 2 splice') for more information | |
about the arguments and return value. | |
`off_in` and `off_out` can be `None`, which is equivalent to `NULL`. | |
If the call to `splice` fails (i.e. returns -1), an `OSError` is raised | |
with the appropriate `errno`, unless the error is `EINTR`, which results | |
in the call to be retried. | |
''' | |
c_off_in = \ | |
ctypes.byref(c_loff_t(off_in)) if off_in is not None else None | |
c_off_out = \ | |
ctypes.byref(c_loff_t(off_out)) if off_out is not None else None | |
# For handling EINTR... | |
while True: | |
res = c_splice(fd_in, c_off_in, fd_out, c_off_out, len_, flags) | |
if res == -1: | |
errno_ = ctypes.get_errno() | |
# Try again on EINTR | |
if errno_ == errno.EINTR: | |
continue | |
raise IOError(errno_, os.strerror(errno_)) | |
return res | |
return splice | |
# Build and export wrapper | |
splice = make_splice() #pylint: disable-msg=C0103 | |
del make_splice | |
# From bits/fcntl.h | |
# Values for 'flags', can be OR'ed together | |
SPLICE_F_MOVE = 1 | |
SPLICE_F_NONBLOCK = 2 | |
SPLICE_F_MORE = 4 | |
SPLICE_F_GIFT = 8 | |
def main(host, port, path): | |
'''Server implementation''' | |
# Set up a simple server socket | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind((host, port)) | |
sock.listen(1) | |
# Single accept, we'll clean up once this one connection has been handled. | |
# Yes, this is a very stupid server indeed. | |
conn, addr = sock.accept() | |
print 'Connection from:', addr | |
# Set up some subprocess which produces some output which should be | |
# transferred to the client. | |
# In this case, we just 'cat' this source file. | |
argv = ['cat', path] | |
# Might want to do something about stdin and stdout as well in a serious | |
# application | |
proc = subprocess.Popen(argv, close_fds=True, stdout=subprocess.PIPE) | |
# We need the integer FDs for splice to work | |
pipe_fd = proc.stdout.fileno() | |
conn_fd = conn.fileno() #pylint: disable-msg=E1101 | |
print 'Will splice data from FD', pipe_fd, 'to', conn_fd | |
transferred = 0 | |
# 32MB chunks | |
chunksize = 32 * 1024 * 1024 | |
# If you know the number of bytes to be transferred upfront, you could | |
# change this into a 'while todo > 0', pass 'todo' to splice instead of the | |
# arbitrary 'chunksize', and error out if splice returns 0 before all bytes | |
# are transferred. | |
# In this example, we just transfer as much as possible until the write-end | |
# closes the pipe. | |
while True: | |
done = splice(pipe_fd, None, conn_fd, None, chunksize, | |
SPLICE_F_MOVE | SPLICE_F_MORE) | |
if done == 0: | |
# Write-end of the source pipe has gone, no more data will be | |
# available | |
break | |
transferred += done | |
print 'Bytes transferred:', transferred | |
# Close client and server socket | |
conn.close() | |
sock.close() | |
# Wait for subprocess to finish (it should be finished by now anyway...) | |
proc.wait() | |
if __name__ == '__main__': | |
main('', 9009, os.path.abspath(__file__)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment