Created
October 13, 2014 02:35
-
-
Save kfei/5c9222dad94c3e588fe7 to your computer and use it in GitHub Desktop.
A Python3 ZMQ file transfer example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# File Transfer model #3 | |
# | |
# In which the client requests each chunk individually, using | |
# command pipelining to give us a credit-based flow control. | |
import os | |
import sys | |
from threading import Thread | |
import zmq | |
def zpipe(ctx): | |
import binascii | |
"""build inproc pipe for talking to threads | |
mimic pipe used in czmq zthread_fork. | |
Returns a pair of PAIRs connected via inproc | |
""" | |
a = ctx.socket(zmq.PAIR) | |
b = ctx.socket(zmq.PAIR) | |
a.linger = b.linger = 0 | |
a.hwm = b.hwm = 1 | |
iface = "inproc://%s" % binascii.hexlify(os.urandom(8)) | |
a.bind(iface) | |
b.connect(iface) | |
return a,b | |
CHUNK_SIZE = 250000 | |
def client_thread(ctx, pipe): | |
dealer = ctx.socket(zmq.DEALER) | |
# libzmq 2/3 compatible sethwm | |
try: | |
dealer.sndhwm = dealer.rcvhwm = 1 | |
except AtttibuteError: | |
dealer.hwm = 1 | |
dealer.connect("tcp://127.0.0.1:6000") | |
total = 0 # Total bytes received | |
chunks = 0 # Total chunks received | |
while True: | |
# Ask for next chunk | |
if sys.version_info >= (3, 0): | |
dealer.send_multipart([ | |
"fetch".encode(), | |
total.to_bytes((total.bit_length() // 8) + 1, | |
byteorder='little'), | |
CHUNK_SIZE.to_bytes((CHUNK_SIZE.bit_length() // 8) + 1, | |
byteorder='little') | |
]) | |
else: | |
dealer.send_multipart([ | |
b"fetch", | |
b"%i" % total, | |
b"%i" % CHUNK_SIZE | |
]) | |
try: | |
chunk = dealer.recv() | |
except zmq.ZMQError as e: | |
if e.errno == zmq.ETERM: | |
return # Shutting down, quit | |
else: | |
raise | |
chunks += 1 | |
size = len(chunk) | |
total += size | |
if size < CHUNK_SIZE: | |
break # Last chunk received; exit | |
print("%i chunks received, %i bytes" % (chunks, total)) | |
pipe.send(b"OK") | |
# .split File server thread | |
# The server thread waits for a chunk request from a client, | |
# reads that chunk and sends it back to the client: | |
def server_thread(ctx): | |
file = open("testdata", "rb") | |
router = ctx.socket(zmq.ROUTER) | |
router.bind("tcp://*:6000") | |
while True: | |
# First frame in each message is the sender identity | |
# Second frame is "fetch" command | |
try: | |
msg = router.recv_multipart() | |
except zmq.ZMQError as e: | |
if e.errno == zmq.ETERM: | |
return # Shutting down, quit | |
else: | |
raise | |
identity, command, offset_str, chunksz_str = msg | |
assert command == b"fetch" | |
if sys.version_info >= (3, 0): | |
offset = int.from_bytes(offset_str, 'little') | |
chunksz = int.from_bytes(chunksz_str, 'little') | |
else: | |
offset = int(offset_str) | |
chunksz = int(chunksz_str) | |
# Read chunk of data from file | |
file.seek(offset, os.SEEK_SET) | |
data = file.read(chunksz) | |
# Send resulting chunk to client | |
router.send_multipart([identity, data]) | |
# The main task is just the same as in the first model. | |
# .skip | |
def main(): | |
# Start child threads | |
ctx = zmq.Context() | |
a, b = zpipe(ctx) | |
client = Thread(target=client_thread, args=(ctx, b)) | |
server = Thread(target=server_thread, args=(ctx,)) | |
client.start() | |
server.start() | |
# Loop until client tells us it's done | |
try: | |
print(a.recv()) | |
except KeyboardInterrupt: | |
pass | |
del a, b | |
ctx.term() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment