Skip to content

Instantly share code, notes, and snippets.

@halityurttas
Created April 22, 2025 20:18
Show Gist options
  • Save halityurttas/f5de1e6b2f2632e764f51117b6593e51 to your computer and use it in GitHub Desktop.
Save halityurttas/f5de1e6b2f2632e764f51117b6593e51 to your computer and use it in GitHub Desktop.
CSV to Parquet on OpenPaas
import os
import pika
import requests
import pandas as pd
RABBITMQ_URL = os.getenv("rabbitmq_url", "amqp://guest:guest@rabbitmq:5672/")
QUEUE_NAME = os.getenv("queue_name", "csv_download")
OUTPUT_PATH = os.getenv("output_path", "/data/output.parquet")
def download_csv_and_convert_to_parquet(url, output_path):
response = requests.get(url)
response.raise_for_status()
csv_data = response.content.decode('utf-8')
df = pd.read_csv(pd.compat.StringIO(csv_data))
df.to_parquet(output_path, engine='pyarrow')
print(f"File saved to {output_path}")
def callback(ch, method, properties, body):
url = body.decode('utf-8')
try:
download_csv_and_convert_to_parquet(url, OUTPUT_PATH)
except Exception as e:
print(f"Error processing URL {url}: {e}")
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
params = pika.URLParameters(RABBITMQ_URL)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
print(f"Waiting for messages in queue: {QUEUE_NAME}")
channel.basic_consume(queue=QUEUE_NAME, on_message_callback=callback)
channel.start_consuming()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment