Last active
July 11, 2024 14:08
-
-
Save mh0w/4985ae266eec9288eebb962473c5bf06 to your computer and use it in GitHub Desktop.
BOTO3 basics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
################################################################# | |
# Reading and writing files Sparklessly from S3 with BOTO3 # | |
################################################################# | |
import boto3 | |
import raz_client | |
import pandas as pd | |
import io | |
my_bucket = "bucket_name_goes_here" | |
input_csv = "folder/path/goes/here/animal_rescue.xlsx" | |
input_xlsx = "folder/path/goes/here/animal_rescue.csv" | |
input_parquet = "folder/path/goes/here/animal_rescue.parquet" | |
table_path = "bat/my_database/hive_ext/my_table/part-00000-f8a2e6d1-d358-4907-9678-00397b4b6bde-c000.snappy.parquet" | |
output_path = "folder/path/goes/here/test" | |
my_ssl_path = "/folder/path/goes/here/certs/ca-bundle.crt" | |
client = boto3.client("s3") | |
raz_client.configure_ranger_raz(client, ssl_file=my_ssl_path) | |
############# | |
# Discovery # | |
############# | |
# check bucket metadata: region, status, server, etc | |
pprint(client.head_bucket(Bucket=my_bucket)) | |
pprint(client.get_bucket_location(Bucket=my_bucket)) | |
# Check pagination attribute | |
print(client.can_paginate("get_object")) | |
# Get object attributes (file size, last modified timestamp, etc) | |
pprint(client.head_object(Bucket=my_bucket, Key=input_csv)) | |
# List every file in the bucket (caps out at max 1000; here capped to 10) | |
cap = 10 | |
for content in client.list_objects_v2(Bucket=my_bucket)['Contents'][0:cap]: | |
obj_dict = client.get_object(Bucket=my_bucket, Key=content['Key']) | |
print(content['Key'], obj_dict['LastModified']) | |
# List top-level common prefixes in Amazon S3 bucket (i.e. top directories) | |
paginator = client.get_paginator('list_objects') | |
result = paginator.paginate(Bucket=my_bucket, Delimiter='/') | |
for prefix in result.search('CommonPrefixes'): | |
print(prefix.get('Prefix')) | |
# x (testing something - local tables) | |
dynamodb = boto3.resource('dynamodb', region_name="eu-west-2") | |
table = dynamodb.Table('my_database.my_table') | |
print(table.creation_date_time) | |
############## | |
# Read files # | |
############## | |
def boto_read(bucket, file_path, file_type, sheet=0) -> pd.DataFrame|dict: | |
valid_file_types = ["csv", "xls", "xlsx", "parquet", "xml", "ods"] | |
if file_type not in valid_file_types: | |
raise ValueError(f"Invalid file_type. Expected one of: {valid_file_types}") | |
print(f"Attempting to read {file_type} file") | |
with io.BytesIO(client.get_object(Bucket=bucket, Key=file_path)["Body"].read()) as f: | |
if file_type == "csv": | |
results = pd.read_csv(f) | |
elif file_type in ["xlsx", "xls", "ods"]: | |
results = pd.read_excel(f, sheet_name=sheet) | |
elif file_type == "parquet": | |
results = pd.read_parquet(f) | |
elif file_type == "xml": | |
root = ET.parse(f) | |
results = dict() | |
for child in root.iter(): | |
if child.text.strip(): | |
results[child.tag] = child.text | |
return results | |
boto_read(bucket=my_bucket, file_path=input_xlsx, file_type="xlsx").head() | |
boto_read(bucket=my_bucket, file_path=output_path+".xls", file_type="xls").head() | |
boto_read(bucket=my_bucket, file_path=output_path+".ods", file_type="ods").head() | |
boto_read(bucket=my_bucket, file_path=input_csv, file_type="csv").head() | |
boto_read(bucket=my_bucket, file_path=input_parquet, file_type="parquet").head() | |
print(boto_read(bucket=my_bucket, file_path=output_path+".xml", file_type="xml")) | |
my_df = boto_read(bucket=my_bucket, file_path=input_parquet, file_type="parquet") | |
############# | |
# Write csv # | |
############# | |
with io.BytesIO() as output: | |
my_df.to_csv(output) | |
client.put_object(Bucket=my_bucket, Key=output_path+".csv", Body=output.getvalue()) | |
####################### | |
# Write xls(x) or ods # | |
####################### | |
# Create workbook object, add df as a sheet, and write out to an .xlsx file | |
with io.BytesIO() as output: | |
with pd.ExcelWriter(output, engine='openpyxl') as writer: | |
my_df.to_excel(writer) | |
client.put_object(Bucket=my_bucket, Key=output_path+".xlsx", Body=output.getvalue()) | |
# Create workbook object, add df as a sheet, and write out to an .xls file | |
with io.BytesIO() as output: | |
with pd.ExcelWriter(output, engine='openpyxl') as writer: | |
my_df.to_excel(writer) | |
client.put_object(Bucket=my_bucket, Key=output_path+".xls", Body=output.getvalue()) | |
# Create workbook object, add df as a sheet, and write out to an .ods file | |
with io.BytesIO() as output: | |
with pd.ExcelWriter(output, engine='odf') as writer: | |
my_df.to_excel(writer) | |
client.put_object(Bucket=my_bucket, Key=output_path+".ods", Body=output.getvalue()) | |
################# | |
# Write parquet # | |
################# | |
with io.BytesIO() as output: | |
my_df.to_parquet(output) | |
client.put_object(Bucket=my_bucket, Key=output_path+".parquet", Body=output.getvalue()) | |
# xml | |
with io.BytesIO() as output: | |
tree = ET.ElementTree(ET.XML('<QuoteWerksXML><AppVersionMajor>5.1</AppVersionMajor></QuoteWerksXML>')) | |
ET.indent(tree) | |
tree.write(output, encoding='utf-8', xml_declaration=True) | |
client.put_object(Bucket=my_bucket, Key=output_path+".xml", Body=output.getvalue()) | |
# List the files in the output folder | |
for item in client.list_objects(Bucket=my_bucket, Prefix=output_path).get("Contents"): | |
print(item["Key"]) | |
######## | |
# Hive # | |
######## | |
# kinda... | |
with io.BytesIO(client.get_object(Bucket=my_bucket, Key=table_path)["Body"].read()) as f: | |
my_table = pd.read_parquet(f) | |
my_table.head() | |
my_table.shape |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment