Last active
September 5, 2024 18:57
-
-
Save matanper/41726ef6223419f62a5bbe584c5ae1d8 to your computer and use it in GitHub Desktop.
Loading parquet files from S3 into timescaledb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import os | |
import time | |
from subprocess import Popen, PIPE | |
import pyarrow.parquet as pq | |
import pickle | |
import hashlib | |
TSDB_CPUS = 8 | |
TS_URI = 'postgres://user:pass@host:39029/db?sslmode=require' | |
bucket_name = 'X' | |
log_file_path = 'log.pickle' | |
s3 = boto3.client( | |
's3', | |
region_name='us-east-1', | |
aws_access_key_id='X', | |
aws_secret_access_key='X' | |
) | |
def list_files_in_s3_bucket(): | |
# List all files in the specified S3 bucket | |
files = [] | |
try: | |
response = s3.list_objects_v2(Bucket=bucket_name) | |
for item in response.get('Contents', []): | |
files.append(item['Key']) | |
except Exception as e: | |
print(f"Error in listing files: {e}") | |
return sorted(files) | |
def read_files_log(): | |
# Read the log file and return a set of downloaded file names | |
if not os.path.exists(log_file_path): | |
return { | |
'done': set(), | |
'parts': {} | |
} | |
with open(log_file_path, 'rb') as handle: | |
return pickle.load(handle) | |
def save_files_log(log): | |
with open(log_file_path, 'wb') as handle: | |
pickle.dump(log, handle, protocol=pickle.HIGHEST_PROTOCOL) | |
def process_file(key: str, log): | |
file_name = f'{hashlib.sha256(key.encode()).hexdigest()}.parquet' | |
if not os.path.isfile(file_name): | |
print(f'Downloading {key}') | |
s3.download_file(bucket_name, key, file_name) | |
else: | |
print(f'File found for {key}') | |
start = time.time() | |
print(f"Processing {key} at {time.time()}") | |
if key not in log['parts']: | |
log['parts'][key] = set() | |
parquet = pq.ParquetFile(file_name) | |
columns = ','.join(parquet.schema.names) | |
for i, part in enumerate(parquet.iter_batches(batch_size=1_000_000)): | |
if i in log['parts'][key]: | |
print(f'Skipping part {i}') | |
continue | |
print(f'Loading part {i}') | |
process = Popen([ | |
"/home/ec2-user/go/bin/timescaledb-parallel-copy", | |
"-connection", TS_URI, | |
"-skip-header", | |
"-workers", f"{TSDB_CPUS * 2}", | |
"-log-batches", | |
"-table", "klaviyo_events", | |
'-columns', columns | |
], stdin=PIPE) | |
part.to_pandas().to_csv(process.stdin, index=False) | |
process.stdin.close() | |
process.wait() | |
log['parts'][key].add(i) | |
save_files_log(log) | |
now = time.time() | |
print(f"File {key} loaded to db at {now} in {now - start}s") | |
# Add to log | |
log['done'].add(key) | |
os.remove(file_name) | |
# Save in the end | |
save_files_log(log) | |
log_progress = read_files_log() | |
for s3_key in list_files_in_s3_bucket(): | |
if s3_key not in log_progress['done']: | |
process_file(s3_key, log_progress) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
with open('log.pickle', 'rb') as handle: | |
print(pickle.load(handle)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment