Created
          February 28, 2019 17:55 
        
      - 
      
- 
        Save emir-munoz/05410fd51ab9899b98865b7be796aec7 to your computer and use it in GitHub Desktop. 
    Streaming pandas DataFrame to/from S3 with on-the-fly processing and GZIP compression
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | def s3_to_pandas(client, bucket, key, header=None): | |
| # get key using boto3 client | |
| obj = client.get_object(Bucket=bucket, Key=key) | |
| gz = gzip.GzipFile(fileobj=obj['Body']) | |
| # load stream directly to DF | |
| return pd.read_csv(gz, header=header, dtype=str) | |
| def s3_to_pandas_with_processing(client, bucket, key, header=None): | |
| # get key using boto3 client | |
| obj = client.get_object(Bucket=bucket, Key=key) | |
| gz = gzip.GzipFile(fileobj=obj['Body']) | |
| # replace some characters in incomming stream and load it to DF | |
| lines = "\n".join([line.replace('?', ' ') for line in gz.read().decode('utf-8').split('\n')]) | |
| return pd.read_csv(io.StringIO(lines), header=None, dtype=str) | |
| def pandas_to_s3(df, client, bucket, key): | |
| # write DF to string stream | |
| csv_buffer = io.StringIO() | |
| df.to_csv(csv_buffer, index=False) | |
| # reset stream position | |
| csv_buffer.seek(0) | |
| # create binary stream | |
| gz_buffer = io.BytesIO() | |
| # compress string stream using gzip | |
| with gzip.GzipFile(mode='w', fileobj=gz_buffer) as gz_file: | |
| gz_file.write(bytes(csv_buffer.getvalue(), 'utf-8')) | |
| # write stream to S3 | |
| obj = client.put_object(Bucket=bucket, Key=key, Body=gz_buffer.getvalue()) | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment