Skip to content

Instantly share code, notes, and snippets.

@michalc
Created March 7, 2019 09:19
Show Gist options
  • Save michalc/80ebd31dcb75cf805bb5d956c2cad6ea to your computer and use it in GitHub Desktop.
Save michalc/80ebd31dcb75cf805bb5d956c2cad6ea to your computer and use it in GitHub Desktop.
Django + gevent + psycopg2 download whole PostgreSQL table as CSV
import csv
import logging
import gevent
from psycopg2 import (
connect,
sql,
)
from django.conf import (
settings,
)
from django.http import (
StreamingHttpResponse,
)
logger = logging.getLogger()
def csv_view(request, schema, table):
''' Returns a StreamingHttpResponse that contains a CSV of an entire database table
Copyright (c) 2019 Department for International Trade. All rights reserved
This work is licensed under the terms of the MIT license.
For a copy, see https://opensource.org/licenses/MIT.
'''
# Chosen by experimentation for our case.
# You may find better values for yours
cursor_itersize = 1000
queue_size = 5
bytes_queue = gevent.queue.Queue(maxsize=queue_size)
def put_db_rows_to_queue():
# The csv writer "writes" its output by calling a file-like object
# with a `write` method.
class PseudoBuffer:
def write(self, value):
return value
csv_writer = csv.writer(PseudoBuffer())
with \
connect(settings.DATABASE_DSN) as conn, \
conn.cursor(name='all_table_data') as cur: # Named cursor => server-side cursor
cur.itersize = cursor_itersize
cur.arraysize = cursor_itersize
# There is no ordering here. We just want a full dump.
# Allows concurrent SELECT, INSERT, CREAT INDEX + CREATE_ TRIGGER, but
# will block ALTER TABLE, DROP TABLE, TRUNCATE, VACUUM FULL
cur.execute(sql.SQL("""
SELECT
*
FROM
{}.{}
""").format(sql.Identifier(schema), sql.Identifier(table)))
i = 0
while True:
rows = cur.fetchmany(cursor_itersize)
if i == 0:
# Column names are not populated until the first row fetched
bytes_queue.put(csv_writer.writerow([
column_desc[0] for column_desc in cur.description
]), timeout=10)
bytes_fetched = ''.join(
csv_writer.writerow(row) for row in rows
).encode('utf-8')
bytes_queue.put(bytes_fetched, timeout=15)
i += len(rows)
if not rows:
break
bytes_queue.put(csv_writer.writerow(['Number of rows: ' + str(i)]))
def yield_bytes_from_queue():
while put_db_rows_to_queue_job:
try:
# There will be a 0.1 second wait after the end of the data
# from the db to when the connection is closed. Might be able
# to avoid this, but KISS, and minor
yield bytes_queue.get(timeout=0.1)
except gevent.queue.Empty:
pass
def handle_exception(job):
logger.exception(job.exception)
put_db_rows_to_queue_job = gevent.spawn(put_db_rows_to_queue)
put_db_rows_to_queue_job.link_exception(handle_exception)
response = StreamingHttpResponse(yield_bytes_from_queue(), content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename="{schema}_{table}.csv"'
return response
@michalc
Copy link
Author

michalc commented Mar 7, 2019

Usage notes

  • This view function would only make sense if Django is running via gevent, for example from a gevent worker created by gunicorn

  • If behind nginx, you might need the setting,

    proxy_max_temp_file_size 0;
    

    to avoid hitting timeouts if the client doesn't fetch data as fast as the intermediate queue is being filled from the database

@philipkiely
Copy link

That is really neat! One thing I would point out is to make sure to sanitize the inputs table and schema, because they are passed directly into the SQL query they would be easy targets for a SQL injection if exposed to end users. I personally would add the validation within this view itself.

@michalc
Copy link
Author

michalc commented May 22, 2019

@philipkiely I believe they are already being escaped, which forbids SQL injection. The wrapping of the inputs by sql.Identifier does this:

cur.execute(sql.SQL("""
    SELECT
        *
    FROM
        {}.{}
""").format(sql.Identifier(schema), sql.Identifier(table)))

This is very similar to the examples in the psycopg2 documentation at http://initd.org/psycopg/docs/sql.html

@philipkiely
Copy link

My mistake @michalc, thanks for pointing that out!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment