Skip to content

Instantly share code, notes, and snippets.

@matanper
Last active September 5, 2024 18:57
Show Gist options
  • Save matanper/41726ef6223419f62a5bbe584c5ae1d8 to your computer and use it in GitHub Desktop.
Save matanper/41726ef6223419f62a5bbe584c5ae1d8 to your computer and use it in GitHub Desktop.
Loading parquet files from S3 into timescaledb
import boto3
import os
import time
from subprocess import Popen, PIPE
import pyarrow.parquet as pq
import pickle
import hashlib
TSDB_CPUS = 8
TS_URI = 'postgres://user:pass@host:39029/db?sslmode=require'
bucket_name = 'X'
log_file_path = 'log.pickle'
s3 = boto3.client(
's3',
region_name='us-east-1',
aws_access_key_id='X',
aws_secret_access_key='X'
)
def list_files_in_s3_bucket():
# List all files in the specified S3 bucket
files = []
try:
response = s3.list_objects_v2(Bucket=bucket_name)
for item in response.get('Contents', []):
files.append(item['Key'])
except Exception as e:
print(f"Error in listing files: {e}")
return sorted(files)
def read_files_log():
# Read the log file and return a set of downloaded file names
if not os.path.exists(log_file_path):
return {
'done': set(),
'parts': {}
}
with open(log_file_path, 'rb') as handle:
return pickle.load(handle)
def save_files_log(log):
with open(log_file_path, 'wb') as handle:
pickle.dump(log, handle, protocol=pickle.HIGHEST_PROTOCOL)
def process_file(key: str, log):
file_name = f'{hashlib.sha256(key.encode()).hexdigest()}.parquet'
if not os.path.isfile(file_name):
print(f'Downloading {key}')
s3.download_file(bucket_name, key, file_name)
else:
print(f'File found for {key}')
start = time.time()
print(f"Processing {key} at {time.time()}")
if key not in log['parts']:
log['parts'][key] = set()
parquet = pq.ParquetFile(file_name)
columns = ','.join(parquet.schema.names)
for i, part in enumerate(parquet.iter_batches(batch_size=1_000_000)):
if i in log['parts'][key]:
print(f'Skipping part {i}')
continue
print(f'Loading part {i}')
process = Popen([
"/home/ec2-user/go/bin/timescaledb-parallel-copy",
"-connection", TS_URI,
"-skip-header",
"-workers", f"{TSDB_CPUS * 2}",
"-log-batches",
"-table", "klaviyo_events",
'-columns', columns
], stdin=PIPE)
part.to_pandas().to_csv(process.stdin, index=False)
process.stdin.close()
process.wait()
log['parts'][key].add(i)
save_files_log(log)
now = time.time()
print(f"File {key} loaded to db at {now} in {now - start}s")
# Add to log
log['done'].add(key)
os.remove(file_name)
# Save in the end
save_files_log(log)
log_progress = read_files_log()
for s3_key in list_files_in_s3_bucket():
if s3_key not in log_progress['done']:
process_file(s3_key, log_progress)
import pickle
with open('log.pickle', 'rb') as handle:
print(pickle.load(handle))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment