Created
March 25, 2015 22:49
-
-
Save rdhyee/3ee2841d411ee21aab68 to your computer and use it in GitHub Desktop.
A little module I wrote a while back to work with unicode and CSV. (I'm hoping that I won't need it because I can use https://github.com/jdunck/python-unicodecsv instead.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
unicode_csv.py -- wrap the csv module to allow for the handling of CSV files with unicode | |
""" | |
#http://docs.python.org/library/csv.html#examples | |
import csv | |
import codecs | |
import difflib | |
import StringIO | |
import unittest | |
class Recoder(object): | |
""" | |
Iterator that reads an encoded stream and reencodes the input to the input encoding (default UTF-8) | |
""" | |
def __init__(self, f, encoding="UTF-8"): | |
self.encoding = encoding | |
self.reader = codecs.getreader(encoding)(f) | |
def __iter__(self): | |
return self | |
def next(self): | |
return self.reader.next().encode(self.encoding) | |
class UnicodeReader(object): | |
""" | |
A CSV reader which will iterate over lines in the CSV file "f", | |
which is encoded in the given encoding. | |
""" | |
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): | |
self.encoding = encoding | |
f = Recoder(f, self.encoding) | |
self.reader = csv.reader(f, dialect=dialect, **kwds) | |
def next(self): | |
row = self.reader.next() | |
return [unicode(s, self.encoding) for s in row] | |
def __iter__(self): | |
return self | |
class UnicodeWriter(object): | |
""" | |
A CSV writer which will write rows to CSV file "f", | |
which is encoded in the given encoding. | |
""" | |
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): | |
# Redirect output to a queue | |
self.queue = StringIO.StringIO() | |
self.writer = csv.writer(self.queue, dialect=dialect, **kwds) | |
self.stream = f | |
self.encoder = codecs.getincrementalencoder(encoding)() | |
def writerow(self, row): | |
self.writer.writerow([s.encode("utf-8") for s in row]) | |
# Fetch UTF-8 output from the queue ... | |
data = self.queue.getvalue() | |
data = data.decode("utf-8") | |
# ... and reencode it into the target encoding | |
data = self.encoder.encode(data) | |
# write to the target stream | |
self.stream.write(data) | |
# empty queue | |
self.queue.truncate(0) | |
def writerows(self, rows): | |
for row in rows: | |
self.writerow(row) | |
class UnicodeDictReader(object): | |
def __init__(self,f, fieldnames=None, restkey=None, restval=None, dialect=csv.excel, encoding="utf-8", **kwds): | |
self.encoding = encoding | |
f = Recoder(f, self.encoding) | |
self.dictreader = csv.DictReader(f, fieldnames=None, restkey=None, restval=None, dialect=csv.excel, **kwds) | |
def next(self): | |
row = self.dictreader.next() | |
return dict([(unicode(k,self.encoding), unicode(v,self.encoding)) for (k,v) in row.items()]) | |
@property | |
def dialect(self): | |
return self.dictreader.dialect | |
@property | |
def fieldnames(self): | |
return self.dictreader.fieldnames | |
@property | |
def line_num(self): | |
return self.dictreader.line_num | |
def __iter__(self): | |
return self | |
class UnicodeDictWriter(object): | |
""" | |
A CSV DictWriter which will write rows to CSV file "f", | |
which is encoded in the given encoding. | |
""" | |
def __init__(self, f, fieldnames, restval='', extrasaction='raise', dialect='excel', encoding="utf-8", *args, **kwds): | |
# Redirect output to a queue | |
self.queue = StringIO.StringIO() | |
self.writer = csv.DictWriter(self.queue, fieldnames=fieldnames, restval=restval, extrasaction=extrasaction, | |
dialect=dialect, *args, **kwds) | |
self.stream = f | |
self.encoder = codecs.getincrementalencoder(encoding)() | |
def writerow(self, row): | |
self.writer.writerow(dict([(k.encode("utf-8"),v.encode("utf-8")) for (k,v) in row.items()])) | |
# Fetch UTF-8 output from the queue ... | |
data = self.queue.getvalue() | |
data = data.decode("utf-8") | |
# ... and reencode it into the target encoding | |
data = self.encoder.encode(data) | |
# write to the target stream | |
self.stream.write(data) | |
# empty queue | |
self.queue.truncate(0) | |
def writerows(self, rows): | |
for row in rows: | |
self.writerow(row) | |
def output_to_csv(f, headers, rows, write_header=True, convert_values_to_unicode=True): | |
""" | |
take rows, an iterable of dicts (and corresponding headers) and output as a CSV file to f | |
""" | |
cw = UnicodeDictWriter(f, headers) | |
if write_header: | |
cw.writerow(dict([(h,h) for h in headers])) | |
for row in rows: | |
if convert_values_to_unicode: | |
row = dict([(k, unicode(v)) for (k,v) in row.items()]) | |
cw.writerow(row) | |
return f | |
# Unit tests | |
class TestUnicodeCsv(unittest.TestCase): | |
def setUp(self): | |
pass | |
def tearDown(self): | |
pass | |
def test_unicodereader(self): | |
test_string = u"""word,language | |
中文,Chinese | |
français,French""".encode("UTF-8") | |
input_buffer = StringIO.StringIO(test_string) | |
reader = UnicodeReader(input_buffer) | |
reader_output = list(reader) | |
self.assertEqual(reader_output, [[u'word', u'language'], [u"中文", u"Chinese"], [u"français",u"French"]]) | |
def test_UnicodeDictReader(self): | |
test_string = u"""word,language | |
中文,Chinese | |
français,French""".encode("UTF-8") | |
input_buffer = StringIO.StringIO(test_string) | |
reader = UnicodeDictReader(input_buffer) | |
reader_output = list(reader) | |
self.assertEqual(reader_output, [{u'word':u"中文", u'language':u"Chinese"}, {u'word':u"français",u'language':u"French"}]) | |
def test_unicodewriter(self): | |
headers = [u'word', u'language'] | |
output_string = StringIO.StringIO() | |
uwriter = UnicodeWriter(output_string) | |
uwriter.writerow(headers) | |
uwriter.writerow([u"中文", u"Chinese"]) | |
uwriter.writerow([u"français",u"French"]) | |
expected_value = "" | |
expected_value += ",".join(map(lambda x: x.encode("UTF-8"), headers)) + "\r\n" | |
expected_value += ",".join(map(lambda x: x.encode("UTF-8"), [u"中文", u"Chinese"])) + "\r\n" | |
expected_value += ",".join(map(lambda x: x.encode("UTF-8"), [u"français",u"French"])) + "\r\n" | |
output = output_string.getvalue() | |
#print "-".join(map(lambda x: hex(ord(x)), list(output))) | |
#print "-".join(map(lambda x: hex(ord(x)), list(expected_value))) | |
#for line in difflib.context_diff(list(output),list(expected_value)): | |
# print line | |
#print len(output), len(expected_value) | |
self.assertEqual(output, expected_value) | |
def test_unicodedictwriter(): | |
headers = ['word', 'language'] | |
output_file = open("test_name.csv","w") | |
uwriter = UnicodeDictWriter(output_file,headers) | |
uwriter.writerow(dict([(h,h) for h in headers])) | |
uwriter.writerow({"word":u"中文", "language":"Chinese"}) | |
uwriter.writerow({"word":u"français","language":"French"}) | |
if __name__ == "__main__": | |
#test_unicodewriter() | |
#test_unicodedictwriter() | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment