Last active
August 5, 2019 17:36
-
-
Save BryanCutler/b4119b64a297ea4c0e6a28547a89b039 to your computer and use it in GitHub Desktop.
TensorFlow Arrow Blog Part 10 - Serve CSV Data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def serve_csv_data(ip_addr, port_num, directory): | |
""" | |
Create a socket and serve Arrow record batches as a stream read from the | |
given directory containing CVS files. | |
""" | |
# Create the socket | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.bind((ip_addr, port_num)) | |
sock.listen(1) | |
# Serve forever, each client will get one iteration over data | |
while True: | |
conn, _ = sock.accept() | |
outfile = conn.makefile(mode='wb') | |
writer = None | |
try: | |
# Read directory and iterate over each batch in each file | |
batch_iter = read_and_process_dir(directory) | |
for batch in batch_iter: | |
# Initialize the pyarrow writer on first batch | |
if writer is None: | |
writer = pa.RecordBatchStreamWriter(outfile, batch.schema) | |
# Write the batch to the client stream | |
writer.write_batch(batch) | |
# Cleanup client connection | |
finally: | |
if writer is not None: | |
writer.close() | |
outfile.close() | |
conn.close() | |
sock.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment