Skip to content

Instantly share code, notes, and snippets.

@andysingal
Last active May 2, 2023 04:11
Show Gist options
  • Select an option

  • Save andysingal/2e6ec558d81b4dc26f57287d4ad3ee37 to your computer and use it in GitHub Desktop.

Select an option

Save andysingal/2e6ec558d81b4dc26f57287d4ad3ee37 to your computer and use it in GitHub Desktop.
import boto3
import pandas as pd
from io import StringIO, BytesIO
from datetime import datetime, timedelta
from multiprocessing import Pool, cpu_count
def read_csv_from_s3_multiprocessing(bucket_name, date_str, num_processes=1, batch_size=10):
date_dt = datetime.strptime(date_str, '%Y-%m-%d').date() - timedelta(days=1)
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
objects = [obj for obj in bucket.objects.all() if datetime.strptime(obj.key.split('/')[0], '%Y-%m-%d').date() >= date_dt]
df_all = pd.DataFrame(columns=['ISIN', 'Mnemonic', 'SecurityDesc', 'SecurityType', 'Currency',
'SecurityID', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice',
'EndPrice', 'TradedVolume', 'NumberOfTrades'])
for i in range(0, len(objects), batch_size):
batch_objects = objects[i:i+batch_size]
with Pool(num_processes) as p:
results = p.map(read_csv_from_s3_single, [bucket_name+"/"+obj.key for obj in batch_objects])
df_batch = pd.concat(results, ignore_index=True)
df_all = pd.concat([df_all, df_batch], ignore_index=True)
# Save the concatenated dataframe to a CSV file at this checkpoint
df_all.to_csv('logs/data_checkpoint.csv', index=False)
return df_all
def read_csv_from_s3_single(obj_key):
s3 = boto3.resource('s3')
csv_obj = s3.Object(bucket_name=obj_key.split('/')[0], key=obj_key[len(obj_key.split('/')[0])+1:]).get().get('Body').read().decode('utf-8')
data = StringIO(csv_obj)
df = pd.read_csv(data, delimiter=',')
return df
if __name__ == '__main__':
bucket_name = 'xetra-1234'
date_str = '2022-05-07'
batch_size = 1000 # Set the batch size
num_processes = cpu_count() # Set the number of processes to the number of available CPUs
df = read_csv_from_s3_multiprocessing(bucket_name, date_str, batch_size=batch_size, num_processes=num_processes)
print(df.head())
@andysingal
Copy link
Copy Markdown
Author

concatenate multiple files

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment