Skip to content

Instantly share code, notes, and snippets.

@hoffrocket
Created November 13, 2013 01:29
Show Gist options
  • Save hoffrocket/7442032 to your computer and use it in GitHub Desktop.
Save hoffrocket/7442032 to your computer and use it in GitHub Desktop.
process bytes in a forked process from python
#!/usr/bin/env python
import sys
from multiprocessing import Process, Pipe
POLL_TIMEOUT = 5
def hdfs_hadoopbin_upload(conn, dstpath, hdfs_user):
print("Uploading to %s as %s" % (dstpath, hdfs_user))
chunk_count = 0
try:
# wait for bytes from the parent process
while conn.poll(POLL_TIMEOUT):
bytes = conn.recv_bytes()
# do stuff with the bytes
# send a confirmation back to the parent process
chunk_count = chunk_count + 1
conn.send("got chunk %d of %d bytes" % (chunk_count, len(bytes)))
finally:
conn.close()
if __name__ == '__main__':
path_to_read = sys.argv[1]
# create a bidirectional Pipe
parent_conn, child_conn = Pipe(True)
# launch function in a separate process
p = Process(target=hdfs_hadoopbin_upload, args=(child_conn, "/datasets/foo/bar", "mapred"))
p.start()
with open(path_to_read, "rb") as file:
while True:
chunk = file.read(1*1024*1024)
if chunk:
parent_conn.send_bytes(chunk)
# wait for confirmation from the child process
if parent_conn.poll(POLL_TIMEOUT):
print parent_conn.recv()
else:
print("got a timeout waiting for process")
else:
parent_conn.close()
break
print("done with file read")
p.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment