Created
January 9, 2012 15:37
-
-
Save mitjat/1583453 to your computer and use it in GitHub Desktop.
A specialized CharField which leniently parses its input as CSV (comma-separated values). Ideal for when data is to be imported from Excel - users only need to copy&paste. Also supports basic data validation.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#-*- coding:utf-8 -*- | |
from django.utils.translation import ugettext_lazy as _ | |
from django import forms | |
from django.conf import settings | |
from django.utils.encoding import smart_unicode | |
from django.utils.encoding import force_unicode | |
from django.core.exceptions import ValidationError | |
class CSVTableField(forms.CharField): | |
""" | |
A specialized CharField which parses its input as a comma-separated | |
(also tab-separated, semicolon-separated) table and also allows basic | |
data validation within the table. | |
A quick&dirty way to turn a Textarea into a simple table input. | |
Also ideal for when data is to be imported from e.g. Excel | |
(users only need to copy&paste). | |
""" | |
widget = forms.Textarea | |
def __init__(self, *args, **kwargs): | |
""" | |
Takes two keyword arguments, required_cols and optional_cols, describing | |
what columns must/can constitute a well-formed user input. | |
Each of these arguments is a list of tuples (alias1, alias2, ..., | |
aliasN, cleanFunc). See below for their interpretation. | |
All other arguments are passed on to forms.CharField, the parent class. | |
Validation is performed as follows: | |
User input is required to have a header row. The header row must consist | |
only of aliases presented in required_cols or optional_cols. All | |
required_cols must be present. The remaining rows must have the same number | |
of columns as the header row. Each value is passed to the cleanFunc | |
of the corresponding column; cleanFunc may throw a ValidationError. | |
unicode() is a good no-op cleanFunc candidate. | |
cleaned_data produced by this field is a row of dicts; each dict maps | |
from column name (alias1 of the column) to the output of the cleanFunc | |
called with the column's value in that row. | |
""" | |
self.required_cols = kwargs.pop('required_cols', []) | |
self.optional_cols = kwargs.pop('optional_cols', []) | |
super(CSVTableField, self).__init__(*args, **kwargs) | |
@staticmethod | |
def parse_csv(txt): | |
"Take a CSV-formatted table, return a list of lists (of strings)." | |
# normalize newlines and ending newline | |
txt = txt.replace('\r\n','\n').replace('\r','\n').rstrip('\n') + '\n' | |
DELIM = max(';,\t', key=txt.count) # guess the field delimiter | |
QUOTE = '"' | |
# parse excel CSV -- state machine. | |
# states: | |
IN_QUOTED_FIELD=1; IN_UNQUOTED_FIELD=2; FIELD_JUST_STARTED=3; FIELD_JUST_ENDED=4 | |
state = FIELD_JUST_STARTED | |
field = "" | |
row = [] | |
table = [] | |
for prev_ch, ch, next_ch in zip([None]+list(txt)[:-1], txt, list(txt[1:])+[None]): | |
#print `ch`, state, `field` | |
if state==IN_QUOTED_FIELD: | |
if ch==QUOTE: | |
if next_ch==QUOTE: field += QUOTE | |
elif prev_ch==QUOTE: pass | |
else: state = FIELD_JUST_ENDED | |
else: | |
field += ch | |
else: | |
if state==FIELD_JUST_ENDED and ch not in (DELIM, '\n'): | |
raise ValidationError(_('Unpaired quote appears inside a quoted field.')) | |
if ch==QUOTE and state==FIELD_JUST_STARTED: | |
state = IN_QUOTED_FIELD | |
elif ch==DELIM: | |
row.append(field) | |
field = "" | |
state = FIELD_JUST_STARTED | |
elif ch=='\n': | |
row.append(field) | |
field = "" | |
table.append(row) | |
row = [] | |
state = FIELD_JUST_STARTED | |
else: | |
state = IN_UNQUOTED_FIELD | |
field += ch | |
if state!=FIELD_JUST_STARTED: | |
raise ValidationError(_("Unclosed quote")) | |
if not table: | |
raise ValidationError(_("Table has no headers")) | |
if any(len(row)!=len(table[0]) for row in table): | |
raise ValidationError(_("Table rows do not have an equal number of elements")) | |
return table | |
def clean(self, value): | |
txt = super(CSVTableField, self).clean(value) | |
table = self.parse_csv(txt) | |
headers = table[0] | |
allowed_cols = self.required_cols + self.optional_cols | |
# verify all headers are allowed | |
cleaning_fns = [] # cleaning function for each header | |
for i,hdr in enumerate(headers): | |
for col_info in allowed_cols: | |
if hdr.lower() in [alias.lower() for alias in col_info[:-1]]: | |
# this header matches one of the aliases of col_info | |
cleaning_fns.append(col_info[-1]) | |
headers[i] = col_info[0] # normalize column name | |
break | |
else: | |
# no match found | |
raise ValidationError(_("Unknown column name: %(col)s. Only the following are allowed: %(opts)s") % { | |
'col': repr(hdr), | |
'opts': ', '.join(['/'.join(y.encode('utf8','replace') for y in x[:-1]) for x in allowed_cols]) | |
}) | |
# verify no column is missing | |
for col_info in self.required_cols: | |
if col_info[0] not in headers: | |
raise ValidationError(_("Required column missing: ")+'/'.join(map(str,col_info[:-1]))) | |
# parse/clean table values | |
clean_table = [] # table of rows; row is dict column_name->cleaned_value | |
for row_i,row in enumerate(table[1:]): | |
clean_row = {} | |
for hdr, field, clean in zip(headers, row, cleaning_fns): | |
try: clean_row[hdr] = clean(field) | |
except ValueError: | |
raise ValidationError(_("Invalid value for field %(field)s in row %(row)d: %(val)s") % { | |
'field':repr(hdr), 'row':row_i+1, 'val':repr(field)}) | |
clean_table.append(clean_row) | |
return clean_table |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment