Skip to content

Instantly share code, notes, and snippets.

@mahmoudimus
Created September 13, 2010 01:09
Show Gist options
  • Save mahmoudimus/576675 to your computer and use it in GitHub Desktop.
Save mahmoudimus/576675 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
(c) 2010 mahmoud abdelkader (http://blog.mahmoudimus.com/)
from blogpost: http://blog.mahmoudimus.com/reading-and-writing-null-terminated-csv-files-in-python
This file is released under the WTFPL License (http://sam.zoy.org/wtfpl/)
"""
import logging
import csv
import struct
# we use StringIO instead of cStringIO for
# unicode purposes.
from StringIO import StringIO
logger = logging.getLogger(__name__)
null_byte = struct.pack('B', 0)
class null_terminated(csv.excel):
lineterminator = null_byte
csv.register_dialect("null-terminated", null_terminated)
with open("/tmp/file.csv", "w") as writeable_csv_file:
dwriter = csv.DictWriter(writeable_csv_file,
fieldnames=["id", "field"],
dialect="null-terminated")
dwriter.writer.writerow(dwriter.fieldnames)
for i, field in enumerate(("foo", "bar", "baz", "bif")):
dwriter.writerow({"id": i, "field": field})
class NullTerminatedDelimiterReader(object):
"""
A CSV reader which will iterate over lines in the CSV file 'f',
which are line terminated by a null byte
"""
def __init__(self, f, dialect="null-terminated", **kwds):
# satisfying DictReader instance
self._line_num = 0
self.fobj = f
self.reader = self._read()
self.dialect = dialect
self.string_io = StringIO()
def _properly_parse_row(self, current_string):
self.string_io.write(current_string)
# seek to the first byte
self.string_io.seek(0)
return next(csv.reader(self.string_io, dialect=self.dialect))
def _read(self):
current_string = ""
while True:
char = self.fobj.read(1)
if char and char != null_byte:
current_string += char
elif char == null_byte:
yield self._properly_parse_row(current_string)
# increment instrumentation
self._line_num += 1
# clear internal reading buffer
self.string_io.seek(0)
self.string_io.truncate()
# clear row
current_string = ""
elif not char:
if current_string:
yield self._properly_parse_row(current_string)
raise StopIteration
@property
def line_num(self):
return self._line_num
def next(self):
return next(self.reader)
def __iter__(self):
return self
class NullByteDictReader(csv.DictReader):
def __init__(self, f, *args, **kwds):
csv.DictReader.__init__(self, f, *args, **kwds)
self.reader = NullTerminatedDelimiterReader(f, *args, **kwds)
with open("/tmp/file.csv", "r") as f:
dialect = "null-terminated"
for line in NullByteDictReader(f, dialect=dialect):
print line["id"], line["field"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment