Skip to content

Instantly share code, notes, and snippets.

@saswata-dutta
Created August 8, 2023 10:26
Show Gist options
  • Save saswata-dutta/5ed2b624eaeb89f06a00513133428e79 to your computer and use it in GitHub Desktop.
Save saswata-dutta/5ed2b624eaeb89f06a00513133428e79 to your computer and use it in GitHub Desktop.
s3 select over a "folder prefix"
import boto3
import datetime
s3 = boto3.client("s3")
bucket = "???"
prefix_base = "actionType=cancel"
query = """SELECT subjectid, s."timestamp" FROM s3object s where clientid = '???' and status = 'UNCANCELLABLE'"""
def fetch_result(key):
resp = s3.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType="SQL",
Expression=query,
InputSerialization={"Parquet": {}},
OutputSerialization={"CSV": {}},
)
for event in resp["Payload"]:
if "Records" in event:
records = event["Records"]["Payload"].decode("utf-8")
print(records.strip())
def fetch_keys(prefix):
resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
return [it["Key"] for it in resp["Contents"]]
def to_date(str_date):
return datetime.datetime.strptime(str_date, "%Y-%m-%d")
def date_range(start, end):
start = to_date(start)
end = to_date(end)
days = (end - start).days + 1
return [start + datetime.timedelta(days=i) for i in range(days)]
def to_prefix(date):
suffix = date.strftime("year=%Y/month=%m/day=%d")
return f"{prefix_base}/{suffix}/"
def prefix_range(start, end):
prefixes = [to_prefix(d) for d in date_range(start, end)]
return prefixes
def fetch_keys_range(start, end):
prefixes = prefix_range(start, end)
return [k for p in prefixes for k in fetch_keys(p)]
def query_range(start, end):
keys = fetch_keys_range(start, end)
for k in keys:
fetch_result(k)
query_range("2022-08-15", "2022-08-19")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment