Created
November 13, 2013 01:29
-
-
Save hoffrocket/7442032 to your computer and use it in GitHub Desktop.
process bytes in a forked process from python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
from multiprocessing import Process, Pipe | |
POLL_TIMEOUT = 5 | |
def hdfs_hadoopbin_upload(conn, dstpath, hdfs_user): | |
print("Uploading to %s as %s" % (dstpath, hdfs_user)) | |
chunk_count = 0 | |
try: | |
# wait for bytes from the parent process | |
while conn.poll(POLL_TIMEOUT): | |
bytes = conn.recv_bytes() | |
# do stuff with the bytes | |
# send a confirmation back to the parent process | |
chunk_count = chunk_count + 1 | |
conn.send("got chunk %d of %d bytes" % (chunk_count, len(bytes))) | |
finally: | |
conn.close() | |
if __name__ == '__main__': | |
path_to_read = sys.argv[1] | |
# create a bidirectional Pipe | |
parent_conn, child_conn = Pipe(True) | |
# launch function in a separate process | |
p = Process(target=hdfs_hadoopbin_upload, args=(child_conn, "/datasets/foo/bar", "mapred")) | |
p.start() | |
with open(path_to_read, "rb") as file: | |
while True: | |
chunk = file.read(1*1024*1024) | |
if chunk: | |
parent_conn.send_bytes(chunk) | |
# wait for confirmation from the child process | |
if parent_conn.poll(POLL_TIMEOUT): | |
print parent_conn.recv() | |
else: | |
print("got a timeout waiting for process") | |
else: | |
parent_conn.close() | |
break | |
print("done with file read") | |
p.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment