Last active
May 2, 2023 04:11
-
-
Save andysingal/2e6ec558d81b4dc26f57287d4ad3ee37 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import boto3 | |
| import pandas as pd | |
| from io import StringIO, BytesIO | |
| from datetime import datetime, timedelta | |
| from multiprocessing import Pool, cpu_count | |
| def read_csv_from_s3_multiprocessing(bucket_name, date_str, num_processes=1, batch_size=10): | |
| date_dt = datetime.strptime(date_str, '%Y-%m-%d').date() - timedelta(days=1) | |
| s3 = boto3.resource('s3') | |
| bucket = s3.Bucket(bucket_name) | |
| objects = [obj for obj in bucket.objects.all() if datetime.strptime(obj.key.split('/')[0], '%Y-%m-%d').date() >= date_dt] | |
| df_all = pd.DataFrame(columns=['ISIN', 'Mnemonic', 'SecurityDesc', 'SecurityType', 'Currency', | |
| 'SecurityID', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice', | |
| 'EndPrice', 'TradedVolume', 'NumberOfTrades']) | |
| for i in range(0, len(objects), batch_size): | |
| batch_objects = objects[i:i+batch_size] | |
| with Pool(num_processes) as p: | |
| results = p.map(read_csv_from_s3_single, [bucket_name+"/"+obj.key for obj in batch_objects]) | |
| df_batch = pd.concat(results, ignore_index=True) | |
| df_all = pd.concat([df_all, df_batch], ignore_index=True) | |
| # Save the concatenated dataframe to a CSV file at this checkpoint | |
| df_all.to_csv('logs/data_checkpoint.csv', index=False) | |
| return df_all |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def read_csv_from_s3_single(obj_key): | |
| s3 = boto3.resource('s3') | |
| csv_obj = s3.Object(bucket_name=obj_key.split('/')[0], key=obj_key[len(obj_key.split('/')[0])+1:]).get().get('Body').read().decode('utf-8') | |
| data = StringIO(csv_obj) | |
| df = pd.read_csv(data, delimiter=',') | |
| return df | |
| if __name__ == '__main__': | |
| bucket_name = 'xetra-1234' | |
| date_str = '2022-05-07' | |
| batch_size = 1000 # Set the batch size | |
| num_processes = cpu_count() # Set the number of processes to the number of available CPUs | |
| df = read_csv_from_s3_multiprocessing(bucket_name, date_str, batch_size=batch_size, num_processes=num_processes) | |
| print(df.head()) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
concatenate multiple files