Skip to content

Instantly share code, notes, and snippets.

@mitjat
Created January 9, 2012 15:37
Show Gist options
  • Save mitjat/1583453 to your computer and use it in GitHub Desktop.
Save mitjat/1583453 to your computer and use it in GitHub Desktop.
A specialized CharField which leniently parses its input as CSV (comma-separated values). Ideal for when data is to be imported from Excel - users only need to copy&paste. Also supports basic data validation.
#-*- coding:utf-8 -*-
from django.utils.translation import ugettext_lazy as _
from django import forms
from django.conf import settings
from django.utils.encoding import smart_unicode
from django.utils.encoding import force_unicode
from django.core.exceptions import ValidationError
class CSVTableField(forms.CharField):
"""
A specialized CharField which parses its input as a comma-separated
(also tab-separated, semicolon-separated) table and also allows basic
data validation within the table.
A quick&dirty way to turn a Textarea into a simple table input.
Also ideal for when data is to be imported from e.g. Excel
(users only need to copy&paste).
"""
widget = forms.Textarea
def __init__(self, *args, **kwargs):
"""
Takes two keyword arguments, required_cols and optional_cols, describing
what columns must/can constitute a well-formed user input.
Each of these arguments is a list of tuples (alias1, alias2, ...,
aliasN, cleanFunc). See below for their interpretation.
All other arguments are passed on to forms.CharField, the parent class.
Validation is performed as follows:
User input is required to have a header row. The header row must consist
only of aliases presented in required_cols or optional_cols. All
required_cols must be present. The remaining rows must have the same number
of columns as the header row. Each value is passed to the cleanFunc
of the corresponding column; cleanFunc may throw a ValidationError.
unicode() is a good no-op cleanFunc candidate.
cleaned_data produced by this field is a row of dicts; each dict maps
from column name (alias1 of the column) to the output of the cleanFunc
called with the column's value in that row.
"""
self.required_cols = kwargs.pop('required_cols', [])
self.optional_cols = kwargs.pop('optional_cols', [])
super(CSVTableField, self).__init__(*args, **kwargs)
@staticmethod
def parse_csv(txt):
"Take a CSV-formatted table, return a list of lists (of strings)."
# normalize newlines and ending newline
txt = txt.replace('\r\n','\n').replace('\r','\n').rstrip('\n') + '\n'
DELIM = max(';,\t', key=txt.count) # guess the field delimiter
QUOTE = '"'
# parse excel CSV -- state machine.
# states:
IN_QUOTED_FIELD=1; IN_UNQUOTED_FIELD=2; FIELD_JUST_STARTED=3; FIELD_JUST_ENDED=4
state = FIELD_JUST_STARTED
field = ""
row = []
table = []
for prev_ch, ch, next_ch in zip([None]+list(txt)[:-1], txt, list(txt[1:])+[None]):
#print `ch`, state, `field`
if state==IN_QUOTED_FIELD:
if ch==QUOTE:
if next_ch==QUOTE: field += QUOTE
elif prev_ch==QUOTE: pass
else: state = FIELD_JUST_ENDED
else:
field += ch
else:
if state==FIELD_JUST_ENDED and ch not in (DELIM, '\n'):
raise ValidationError(_('Unpaired quote appears inside a quoted field.'))
if ch==QUOTE and state==FIELD_JUST_STARTED:
state = IN_QUOTED_FIELD
elif ch==DELIM:
row.append(field)
field = ""
state = FIELD_JUST_STARTED
elif ch=='\n':
row.append(field)
field = ""
table.append(row)
row = []
state = FIELD_JUST_STARTED
else:
state = IN_UNQUOTED_FIELD
field += ch
if state!=FIELD_JUST_STARTED:
raise ValidationError(_("Unclosed quote"))
if not table:
raise ValidationError(_("Table has no headers"))
if any(len(row)!=len(table[0]) for row in table):
raise ValidationError(_("Table rows do not have an equal number of elements"))
return table
def clean(self, value):
txt = super(CSVTableField, self).clean(value)
table = self.parse_csv(txt)
headers = table[0]
allowed_cols = self.required_cols + self.optional_cols
# verify all headers are allowed
cleaning_fns = [] # cleaning function for each header
for i,hdr in enumerate(headers):
for col_info in allowed_cols:
if hdr.lower() in [alias.lower() for alias in col_info[:-1]]:
# this header matches one of the aliases of col_info
cleaning_fns.append(col_info[-1])
headers[i] = col_info[0] # normalize column name
break
else:
# no match found
raise ValidationError(_("Unknown column name: %(col)s. Only the following are allowed: %(opts)s") % {
'col': repr(hdr),
'opts': ', '.join(['/'.join(y.encode('utf8','replace') for y in x[:-1]) for x in allowed_cols])
})
# verify no column is missing
for col_info in self.required_cols:
if col_info[0] not in headers:
raise ValidationError(_("Required column missing: ")+'/'.join(map(str,col_info[:-1])))
# parse/clean table values
clean_table = [] # table of rows; row is dict column_name->cleaned_value
for row_i,row in enumerate(table[1:]):
clean_row = {}
for hdr, field, clean in zip(headers, row, cleaning_fns):
try: clean_row[hdr] = clean(field)
except ValueError:
raise ValidationError(_("Invalid value for field %(field)s in row %(row)d: %(val)s") % {
'field':repr(hdr), 'row':row_i+1, 'val':repr(field)})
clean_table.append(clean_row)
return clean_table
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment