Created
January 14, 2013 17:39
-
-
Save EBNull/4531793 to your computer and use it in GitHub Desktop.
Replacement for csv.reader that reads encoded input into a dict based on column definitions.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
class StreamedDataConverter(object): | |
"""A conversion description that can convert one list of data into a processed dict using the cols attribute. | |
>>> class MyConverter(StreamedDataConverter): | |
... cols = ['a', 'b', 'c'] | |
... | |
>>> c = MyConverter(iter([[4,5,6],[6,7,8]])) | |
>>> c.map_fieldnames(['a','b','c']) | |
>>> list(c) | |
[{u'a': 4, u'c': 6, u'b': 5}, {u'a': 6, u'c': 8, u'b': 7}] | |
>>> class MyConverter(StreamedDataConverter): | |
... cols = dict( | |
... a=dict(convert=int), | |
... q=dict(column='b', convert=int), | |
... c=dict(convert=lambda z: 'nope'), | |
... ) | |
... | |
>>> c = MyConverter(iter([[4,5,6],[6,7,8]])) | |
>>> c.map_fieldnames(['a','b','c']) | |
>>> list(c) | |
[{u'a': 4, u'q': 5, u'c': 'nope'}, {u'a': 6, u'q': 7, u'c': 'nope'}] | |
""" | |
drop_blank=True | |
encoding = 'utf-8-sig' | |
def __init__(self, data_iterator, encoding=None): | |
if encoding: | |
self.encoding = encoding | |
self.data_iter = data_iterator | |
self.i = 1 | |
def __iter__(self): | |
self.i = 1 | |
return self | |
def map_fieldnames(self, fields): | |
old_fn_list = [x.decode(self.encoding) for x in fields] | |
new_fn_list = [] | |
cols = {} | |
if hasattr(self.cols, 'items'): | |
for colname, opts in self.cols.iteritems(): | |
opts['name'] = colname | |
if 'column' in opts: | |
cols[opts['column']] = opts | |
else: | |
cols[colname] = opts | |
else: | |
cols = dict((x, dict(name=x)) for x in self.cols) | |
for on in old_fn_list: | |
if on in cols: | |
new_fn_list.append(cols[on]['name']) | |
cols.pop(on) | |
continue | |
on = on.lower() | |
if on in cols: | |
new_fn_list.append(cols[on]['name']) | |
cols.pop(on) | |
continue | |
on = ' '.join(on.strip().split()) | |
if on in cols: | |
new_fn_list.append(cols[on]['name']) | |
cols.pop(on) | |
continue | |
on = on.replace(' ', '_') | |
if on in cols: | |
new_fn_list.append(cols[on]['name']) | |
cols.pop(on) | |
continue | |
new_fn_list.append(None) | |
if len(cols): | |
raise Exception("These columns were not in the input file: %s"%(','.join(cols))) | |
self._fieldnames = [x.decode(self.encoding) if hasattr(x, 'decode') else x for x in new_fn_list] | |
def __next__(self): | |
try: | |
oldret = next(self.data_iter) | |
except StopIteration: | |
self.i = 0 | |
raise | |
self.i += 1 | |
try: | |
ret = dict(zip(self._fieldnames, [x.decode(self.encoding) if hasattr(x, 'decode') else x for x in oldret])) | |
except UnicodeDecodeError as e: | |
raise ValueError("Encoding error on input line %d: %s"%(self.i, e)) | |
if self.drop_blank: | |
if None in oldret: | |
oldret.pop(None) | |
if not hasattr(self.cols, 'items'): | |
return ret | |
#dict with options | |
for colname, opts in self.cols.iteritems(): | |
if 'convert' in opts: | |
fn = opts['convert'] | |
param = ret[opts['name']] | |
try: | |
res = fn(param) | |
except Exception as e: | |
#TODO: raise exception relating to the actual column | |
raise ValueError("Problem converting column '%s' on input line %d: %s"%(colname, self.i, e)) | |
raise | |
ret[opts['name']] = res | |
return ret | |
next = __next__ | |
class NormalDictReader(object): | |
r"""A replacement csv.DictReader that relies on column definitions defined in a subclass and deals with encoding issues. | |
Examples: | |
class MyReader(NormalDictReader): | |
cols = ['a', 'b', 'c'] | |
>>> class MyReader(NormalDictReader): | |
... cols = dict( | |
... a = dict(column='a', convert=int), | |
... q = dict(column='b'), | |
... c = dict(column='c', convert=lambda x: int(x)), | |
... ) | |
... | |
>>> import StringIO | |
>>> list(MyReader(StringIO.StringIO("a,b,c\n1,2,3\n4,5,6"))) | |
[{u'a': 1, u'q': u'2', u'c': 3}, {u'a': 4, u'q': u'5', u'c': 6}] | |
""" | |
drop_blank=True | |
encoding = 'utf-8-sig' | |
def __init__(self, openfile, encoding=None, *args, **kwargs): | |
if encoding: | |
self.encoding = encoding | |
self._r = csv.reader(openfile, *args, **kwargs) | |
self._conv = StreamedDataConverter(self._r, encoding=self.encoding) | |
self._conv.cols = self.cols | |
self._conv.map_fieldnames(next(self._r)) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
return next(self._conv) | |
next = __next__ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment