Created
March 7, 2019 09:19
-
-
Save michalc/80ebd31dcb75cf805bb5d956c2cad6ea to your computer and use it in GitHub Desktop.
Django + gevent + psycopg2 download whole PostgreSQL table as CSV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import logging | |
import gevent | |
from psycopg2 import ( | |
connect, | |
sql, | |
) | |
from django.conf import ( | |
settings, | |
) | |
from django.http import ( | |
StreamingHttpResponse, | |
) | |
logger = logging.getLogger() | |
def csv_view(request, schema, table): | |
''' Returns a StreamingHttpResponse that contains a CSV of an entire database table | |
Copyright (c) 2019 Department for International Trade. All rights reserved | |
This work is licensed under the terms of the MIT license. | |
For a copy, see https://opensource.org/licenses/MIT. | |
''' | |
# Chosen by experimentation for our case. | |
# You may find better values for yours | |
cursor_itersize = 1000 | |
queue_size = 5 | |
bytes_queue = gevent.queue.Queue(maxsize=queue_size) | |
def put_db_rows_to_queue(): | |
# The csv writer "writes" its output by calling a file-like object | |
# with a `write` method. | |
class PseudoBuffer: | |
def write(self, value): | |
return value | |
csv_writer = csv.writer(PseudoBuffer()) | |
with \ | |
connect(settings.DATABASE_DSN) as conn, \ | |
conn.cursor(name='all_table_data') as cur: # Named cursor => server-side cursor | |
cur.itersize = cursor_itersize | |
cur.arraysize = cursor_itersize | |
# There is no ordering here. We just want a full dump. | |
# Allows concurrent SELECT, INSERT, CREAT INDEX + CREATE_ TRIGGER, but | |
# will block ALTER TABLE, DROP TABLE, TRUNCATE, VACUUM FULL | |
cur.execute(sql.SQL(""" | |
SELECT | |
* | |
FROM | |
{}.{} | |
""").format(sql.Identifier(schema), sql.Identifier(table))) | |
i = 0 | |
while True: | |
rows = cur.fetchmany(cursor_itersize) | |
if i == 0: | |
# Column names are not populated until the first row fetched | |
bytes_queue.put(csv_writer.writerow([ | |
column_desc[0] for column_desc in cur.description | |
]), timeout=10) | |
bytes_fetched = ''.join( | |
csv_writer.writerow(row) for row in rows | |
).encode('utf-8') | |
bytes_queue.put(bytes_fetched, timeout=15) | |
i += len(rows) | |
if not rows: | |
break | |
bytes_queue.put(csv_writer.writerow(['Number of rows: ' + str(i)])) | |
def yield_bytes_from_queue(): | |
while put_db_rows_to_queue_job: | |
try: | |
# There will be a 0.1 second wait after the end of the data | |
# from the db to when the connection is closed. Might be able | |
# to avoid this, but KISS, and minor | |
yield bytes_queue.get(timeout=0.1) | |
except gevent.queue.Empty: | |
pass | |
def handle_exception(job): | |
logger.exception(job.exception) | |
put_db_rows_to_queue_job = gevent.spawn(put_db_rows_to_queue) | |
put_db_rows_to_queue_job.link_exception(handle_exception) | |
response = StreamingHttpResponse(yield_bytes_from_queue(), content_type='text/csv') | |
response['Content-Disposition'] = f'attachment; filename="{schema}_{table}.csv"' | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@philipkiely I believe they are already being escaped, which forbids SQL injection. The wrapping of the inputs by
sql.Identifier
does this:This is very similar to the examples in the psycopg2 documentation at http://initd.org/psycopg/docs/sql.html