Last active
August 29, 2015 13:56
-
-
Save markwatson/8813314 to your computer and use it in GitHub Desktop.
Reads CSV files while handling all types of edge cases. (Built as an exercise - probably not useful)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import unittest | |
class CsvReader(object): | |
""" | |
Reads CSV files while handling all types of edge cases. | |
""" | |
def __init__(self, lines): | |
""" | |
A new CSV reader with the given lines. | |
""" | |
self.__lines = lines | |
def lines(self): | |
""" | |
Return a full list. | |
""" | |
return list(self.iter_lines()) | |
def iter_lines(self): | |
""" | |
Yield the resulting rows. | |
""" | |
tokens = self.__tokenizer() | |
in_quo = False | |
just_left_quo = False | |
item = [] | |
items = [] | |
for token, lexeme in tokens: | |
if in_quo: | |
if token == 'quo': | |
in_quo = False | |
just_left_quo = True | |
else: | |
item.append(lexeme) | |
else: | |
if token == 'comma': | |
just_left_quo = False | |
items.append(''.join(item)) | |
item = [] | |
elif token == 'eol': | |
items.append(''.join(item)) | |
yield items | |
item = [] | |
items = [] | |
elif token == 'quo' and not ''.join(item).strip(): | |
item = [] | |
in_quo = True | |
else: | |
if just_left_quo: | |
just_left_quo = False | |
if lexeme.strip(): | |
# We made a mistake, let's add the quote back in | |
item.append('"') | |
item.append(lexeme) | |
else: | |
item.append(lexeme) | |
items.append(''.join(item)) | |
yield items | |
def __tokenizer(self): | |
""" | |
Yield the tokens. | |
""" | |
tokens = [ | |
('double_quo', re.compile(r'""')), | |
('quo', re.compile(r'"')), | |
('comma', re.compile(r',')), | |
('eol', re.compile(r'\n\r|\r\n|\n|\r')), | |
('other', re.compile(r'[^,"\n\r]+')), | |
] | |
for L in self.__lines: | |
rest = L | |
while rest: | |
for name, r in tokens: | |
m = r.match(rest) | |
if m: | |
yield name, m.group(0) | |
rest = rest[len(m.group(0)):] | |
class TestCsvReader(unittest.TestCase): | |
def test_quote_handling(self): | |
""" | |
Test the quote handling. | |
""" | |
test1 = [ | |
'a,1,22,ff, "f" , "3 "," 2" , "another\n', | |
' whatever, yeah," , hohoho,', | |
'slkdfjslkdfj oops, heres a quote: " blah, quote:"\n', | |
# This isn't handled perfectly, but it's good enough. | |
'tricky: "blah" yeah, what"yeah", "yeah"blah,' | |
] | |
parse1 = [ | |
['a', '1', '22', 'ff', 'f', '3 ', ' 2', 'another\n whatever, yeah,', | |
' hohoho', 'slkdfjslkdfj oops', ' heres a quote: " blah', | |
' quote:"'], | |
['tricky: "blah" yeah', ' what"yeah"', 'yeah"blah', '']] | |
self.assertEqual(CsvReader(test1).lines(), parse1) | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment