Last active
November 11, 2022 16:25
-
-
Save konnov/5774100 to your computer and use it in GitHub Desktop.
Simple CSV tools that help in filtering CSV data and creating plots
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__all__ = ['util', 'csvgrep'] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# Tranlate a CSV table (coming with a header) to a SQLite table. | |
# | |
# Igor Konnov, 2016 | |
import argparse | |
import csv | |
import getopt | |
import re | |
import sqlite3 | |
from sys import stdout | |
import sys | |
from util import * | |
class DataError(BaseException): | |
def __init__(self, msg): | |
BaseException.__init__(self, msg) | |
def use(): | |
print "Use: %s [--sniff-dialect] dbfile table_name <csv" % sys.argv[0] | |
print "" | |
sys.exit(1) | |
def export(sniff, dbfilename, table_name, infile): | |
def escape(s): | |
if s.find(",") != -1: | |
return '"%s"' % s | |
else: | |
return s | |
if sniff: | |
reader = SniffingCsvReader(infile) | |
else: | |
reader = csv.reader(infile) | |
conn = sqlite3.connect(dbfilename) | |
cursor = conn.cursor() | |
fields = None | |
lineno = 0 | |
errno = 0 | |
try: | |
for row_arr in reader: | |
lineno += 1 | |
if not fields: | |
fields = row_arr # the first row is the header | |
cols = ['`' + f + '`' + ' text' for f in fields] | |
cursor.execute("CREATE TABLE %s (%s)" \ | |
% (table_name, ",".join(cols))) | |
else: | |
placeholders = len(fields) * ["?"] | |
cursor.execute("INSERT INTO %s VALUES (%s)" \ | |
% (table_name, ",".join(placeholders)), row_arr) | |
conn.commit() | |
except Exception, e: | |
conn.rollback() | |
raise e | |
finally: | |
conn.close() | |
if errno > 0: | |
print >>sys.stderr, ("%d errors occured" % errno) | |
if __name__ == "__main__": | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"]) | |
except getopt.GetoptError as err: | |
# print help information and exit: | |
print(str(err)) # will print something like "option -a not recognized" | |
use() | |
sniff = False | |
for o, a in opts: | |
if o in ("-h", "--help"): | |
use() | |
elif o in ("--sniff-dialect"): | |
sniff = True | |
else: | |
assert False, "unexpected option" | |
if len(args) < 2: | |
use() | |
export(sniff, args[0], args[1], sys.stdin) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# A filter on CSV that is similar to SELECT name1, .., name_k FROM CSV in SQL. | |
# | |
# Igor Konnov, 2013-2014 | |
import argparse | |
import csv | |
import getopt | |
import re | |
from sys import stdout | |
import sys | |
from util import * | |
class DataError(BaseException): | |
def __init__(self, msg): | |
BaseException.__init__(self, msg) | |
def use(): | |
print "Use: %s [--sniff-dialect] field1 ... field_k <csv" % sys.argv[0] | |
print "" | |
sys.exit(1) | |
def parse(args): | |
if len(args) < 1: | |
use() | |
else: | |
return args | |
def cut_csv(sniff, target_fields, infile): | |
def escape(s): | |
if s.find(",") != -1: | |
return '"%s"' % s | |
else: | |
return s | |
if sniff: | |
reader = SniffingCsvReader(infile) | |
else: | |
reader = csv.reader(infile) | |
fields = None | |
lineno=0 | |
errno=0 | |
for row_arr in reader: | |
lineno += 1 | |
if not fields: | |
fields = row_arr # the first row is the header | |
for f in target_fields: | |
if f not in fields: | |
raise DataError("Column %s not found" % f) | |
print ",".join(target_fields) | |
else: | |
row = {} | |
for i, k in enumerate(fields): | |
try: | |
row[k] = row_arr[i] | |
except IndexError, _: | |
errno += 1 | |
row[k] = "" | |
if errno <= 10: | |
print >>sys.stderr, \ | |
("ERROR: column %d not found on line %d" % (i, lineno)) | |
print ",".join([escape(row[f]) for f in target_fields]) | |
if errno > 0: | |
print >>sys.stderr, ("%d errors occured" % errno) | |
if __name__ == "__main__": | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"]) | |
except getopt.GetoptError as err: | |
# print help information and exit: | |
print(str(err)) # will print something like "option -a not recognized" | |
use() | |
sniff = False | |
for o, a in opts: | |
if o in ("-h", "--help"): | |
use() | |
elif o in ("--sniff-dialect"): | |
sniff = True | |
else: | |
assert False, "unexpected option" | |
fields = parse(args) | |
cut_csv(sniff, fields, sys.stdin) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# A filter on CSV that is similar to SELECT * FROM CSV WHERE... in SQL. | |
# | |
# For instance, csvgrep "param~=.*F=3.*" -a "foo=bar" <my.csv | |
# | |
# Igor Konnov, 2013 | |
import argparse | |
import csv | |
import getopt | |
import re | |
from sys import stdout | |
import sys | |
from util import * | |
class ArgError(BaseException): | |
def __init__(self, msg): | |
BaseException.__init__(self, msg) | |
class DataError(BaseException): | |
def __init__(self, msg): | |
BaseException.__init__(self, msg) | |
EQ = "EQ" | |
RE = "RE" | |
AND = "AND" | |
OR = "OR" | |
NOT = "NOT" | |
class Expr: | |
def __init__(self, kind, ops): | |
self.kind = kind | |
self.ops = ops | |
# XXX: a simple recursive parser, I don't have time to think about it | |
class Parser: | |
def __init__(self, args): | |
self.args = args | |
self.buf = args | |
def parse(self): | |
expr = self.parse_and() | |
if not self.is_eof(): | |
raise ArgError("Expected end, found: " + " ".join(self.buf)) | |
return expr | |
def parse_and(self): | |
ops = [self.parse_or()] | |
while self.top() == "-a": | |
self.consume("-a") | |
ops.append(self.parse_or()) | |
if len(ops) > 1: | |
return Expr(AND, ops) | |
else: | |
return ops[0] | |
def parse_or(self): | |
ops = [self.parse_not()] | |
while self.top() == "-o": | |
self.consume("-o") | |
ops.append(self.parse_not()) | |
if len(ops) > 1: | |
return Expr(OR, ops) | |
else: | |
return ops[0] | |
def parse_not(self): | |
neg = False | |
while self.top() == "!": | |
self.consume("!") | |
neg = not neg | |
expr = self.parse_eq_re() | |
if neg: | |
return Expr(NOT, [expr]) | |
else: | |
return expr | |
def parse_eq_re(self): | |
tok = self.get() | |
if tok.find("~=") != -1: | |
i = tok.find("~=") | |
k, r = tok[: i], re.compile(tok[i + 2 :]) | |
return Expr(RE, [k, r]) | |
elif tok.find('=') != -1: | |
i = tok.find('=') | |
k, v = tok[: i], tok[i + 1 :] | |
return Expr(EQ, [k, v]) | |
else: | |
raise ArgError("Expected k=v or k~=re, found %s" % tok) | |
def is_eof(self): | |
return self.buf == [] | |
def top(self): | |
if self.buf == []: | |
return None | |
else: | |
return self.buf[0] | |
def get(self): | |
if self.buf == []: | |
raise ArgError("No arguments") | |
tok = self.buf[0] | |
self.buf = self.buf[1:] | |
return tok | |
def put(self, tok): | |
self.buf = [tok] + self.buf | |
def consume(self, expected): | |
if self.buf[0] <> expected: | |
raise ArgError("Expected %s, found %s" % (expected, self.buf[0])) | |
else: | |
self.buf = self.buf[1:] | |
def use(): | |
print "Use: %s [--sniff-dialect] [!] k1=v1 -a k2~=v2 -o k3=v3 <csv" % sys.argv[0] | |
print "" | |
sys.exit(1) | |
def parse(args): | |
if len(args) < 1: | |
use() | |
try: | |
expr = Parser(args).parse() | |
except ArgError, e: | |
print "Error: " + str(e) | |
use() | |
return expr | |
def row_matches(expr, row): | |
if expr.kind == AND: | |
for op in expr.ops: | |
if not row_matches(op, row): | |
return False | |
return True | |
elif expr.kind == OR: | |
for op in expr.ops: | |
if row_matches(op, row): | |
return True | |
return False | |
elif expr.kind == NOT: | |
return not row_matches(expr.ops[0], row) | |
elif expr.kind == EQ: | |
name, val = expr.ops | |
try: | |
return row[name] == val | |
except KeyError: | |
raise DataError("Column %s not found" % name) | |
elif expr.kind == RE: | |
name, rexp = expr.ops | |
try: | |
return rexp.match(row[name]) | |
except KeyError: | |
raise DataError("Column %s not found" % name) | |
def filter_csv(sniff, expr, infile): | |
def escape(s): | |
if s.find(",") != -1: | |
return '"%s"' % s | |
else: | |
return s | |
if sniff: | |
reader = SniffingCsvReader(infile) | |
else: | |
reader = csv.reader(infile) | |
fields = None | |
line = 1 | |
for row_arr in reader: | |
if not fields: | |
fields = row_arr # the first row is the header | |
print ",".join(fields) | |
else: | |
row = {} | |
for i, k in enumerate(fields): | |
row[k] = row_arr[i] | |
try: | |
if row_matches(expr, row): | |
print ",".join([escape(r) for r in row_arr]) | |
except DataError, e: | |
print >>sys.stderr, "line %d: %s" % (line, str(e)) | |
line += 1 | |
if __name__ == "__main__": | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"]) | |
except getopt.GetoptError as err: | |
# print help information and exit: | |
print(str(err)) # will print something like "option -a not recognized" | |
use() | |
sniff = False | |
for o, a in opts: | |
if o in ("-h", "--help"): | |
use() | |
elif o in ("--sniff-dialect"): | |
sniff = True | |
else: | |
assert False, "unexpected option" | |
expr = parse(args) | |
filter_csv(sniff, expr, sys.stdin) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# Get two files on input: original and mask. | |
# Every non-empty cell in the mask overwrites the cell in the original. | |
# | |
# This can be used, when one wants to replace cells with values | |
# 'timeout', 'out of memory'. | |
# | |
# Igor Konnov, 2014 | |
import argparse | |
import csv | |
import getopt | |
import re | |
from sys import stdout | |
import sys | |
from util import * | |
class DataError(BaseException): | |
def __init__(self, msg): | |
BaseException.__init__(self, msg) | |
def use(): | |
print "Use: %s [--sniff-dialect] original.csv mask.csv" % sys.argv[0] | |
print "" | |
sys.exit(1) | |
def parse(args): | |
if len(args) < 2: | |
use() | |
else: | |
return args | |
def merge_csv(sniff, inp, mask): | |
inpf = open(inp, 'r') | |
if sniff: | |
inpr = SniffingCsvReader(inpf) | |
else: | |
inpr = csv.reader(inpf) | |
maskf = open(mask, 'r') | |
if sniff: | |
maskr = SniffingCsvReader(maskf) | |
else: | |
maskr = csv.reader(maskf) | |
writer = csv.writer(sys.stdout, dialect = inpr.dialect) | |
lineno = 0 | |
for row in inpr: | |
try: | |
mask_row = maskr.next() | |
except StopIteration: | |
mask_row = [] | |
out_row = row | |
mlen = len(mask_row) | |
for i, r in enumerate(row): | |
if lineno != 0 and i < mlen: | |
if mask_row[i].strip() != "": | |
out_row[i] = mask_row[i] | |
writer.writerow(row) | |
lineno += 1 | |
if __name__ == "__main__": | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"]) | |
except getopt.GetoptError as err: | |
# print help information and exit: | |
print(str(err)) # will print something like "option -a not recognized" | |
use() | |
sniff = False | |
for o, a in opts: | |
if o in ("-h", "--help"): | |
use() | |
elif o in ("--sniff-dialect"): | |
sniff = True | |
else: | |
assert False, "unexpected option" | |
orig, mask = parse(args) | |
merge_csv(sniff, orig, mask) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# Join CSV files similar to UNIX paste. If one file is longer, then the other | |
# file is padded. | |
# | |
# Igor Konnov, 2014 | |
import argparse | |
import getopt | |
import csv | |
import re | |
from sys import stdout | |
import sys | |
from util import * | |
class DataError(BaseException): | |
def __init__(self, msg): | |
BaseException.__init__(self, msg) | |
def use(): | |
print "Use: %s [--sniff-dialect] file1.csv prefix1 file2.csv prefix2 ..." % sys.argv[0] | |
print "" | |
sys.exit(1) | |
def parse(args): | |
if len(args) % 2 != 0: | |
use() | |
else: | |
return (args[0::2], args[1::2]) | |
def merge_csv(sniff, files, prefixes): | |
def escape(s): | |
if s.find(",") != -1: | |
return '"%s"' % s | |
else: | |
return s | |
readers = [] | |
widths = [] | |
max_line = 0 | |
dialect = None | |
for f in files: | |
infile = open(f, 'r') | |
if sniff: | |
r = SniffingCsvReader(infile) | |
else: | |
r = csv.reader(infile) | |
dialect = r.dialect | |
readers.append(r) | |
max_line = max(r.line_num, max_line) | |
widths.append(0) | |
writer = csv.writer(sys.stdout, dialect = dialect) | |
has_more = True | |
lineno = 0 | |
while has_more: | |
has_more = False | |
row = [] | |
for i, r in enumerate(readers): | |
try: | |
row_arr = r.next() | |
if lineno == 0: | |
row_arr = [ prefixes[i] + ":" + r for r in row_arr ] | |
has_more = True | |
row += row_arr | |
widths[i] = len(row_arr) | |
except StopIteration: | |
row += [''] * widths[i] | |
except IOError: | |
row += [''] * widths[i] | |
if has_more: | |
writer.writerow(row) | |
lineno += 1 | |
if __name__ == "__main__": | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"]) | |
except getopt.GetoptError as err: | |
# print help information and exit: | |
print(str(err)) # will print something like "option -a not recognized" | |
use() | |
sniff = False | |
for o, a in opts: | |
if o in ("-h", "--help"): | |
use() | |
elif o in ("--sniff-dialect"): | |
sniff = True | |
else: | |
assert False, "unexpected option" | |
files, prefixes = parse(args) | |
merge_csv(sniff, files, prefixes) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# Replace the expressions of the form (column@expr) with the (only) value | |
# matching the query: SELECT column FROM <table_name> WHERE expr. | |
# | |
# Igor Konnov, 2016 | |
import argparse | |
import csv | |
import getopt | |
import re | |
import sqlite3 | |
from sys import stdout | |
import sys | |
from util import * | |
class DataError(BaseException): | |
def __init__(self, msg): | |
BaseException.__init__(self, msg) | |
def use(): | |
print "Use: %s [--sniff-dialect] dbfile table_name <csv" % sys.argv[0] | |
print "" | |
sys.exit(1) | |
def sub_special(cursor, table_name, text): | |
before = 0 | |
pieces = [] | |
for m in re.finditer(r"\$\((.*?)@(.*?)\)", text): | |
if before < m.start(): | |
pieces.append(text[before : m.start()]) | |
cursor.execute('SELECT %s FROM %s WHERE %s'\ | |
% (m.group(1).strip(), table_name, m.group(2).strip())) | |
value = cursor.fetchone() | |
if value: | |
if cursor.fetchone(): | |
value = '>1 results!' | |
else: | |
value = value[0] # a one-element tuple | |
else: | |
value = 'no result!' | |
pieces.append(str(value)) | |
before = m.end() + 1 | |
pieces.append(text[before:]) | |
return "".join(pieces) | |
def sub_general(cursor, text): | |
before = 0 | |
pieces = [] | |
for m in re.finditer(r"\[(.*?)\]", text): | |
if before < m.start(): | |
pieces.append(text[before : m.start()]) | |
cursor.execute(m.group(1)) | |
value = cursor.fetchone() | |
if value: | |
if cursor.fetchone(): | |
value = '>1 results!' | |
else: | |
value = value[0] # a one-element tuple | |
else: | |
value = 'no result!' | |
pieces.append(str(value)) | |
before = m.end() + 1 | |
pieces.append(text[before:]) | |
return "".join(pieces) | |
def export(sniff, dbfilename, table_name, infile): | |
def escape(s): | |
if s.find(",") != -1: | |
return '"%s"' % s | |
else: | |
return s | |
if sniff: | |
reader = SniffingCsvReader(infile) | |
else: | |
reader = csv.reader(infile) | |
conn = sqlite3.connect(dbfilename) | |
cursor = conn.cursor() | |
lineno = 0 | |
errno = 0 | |
committed = False | |
try: | |
for row_arr in reader: | |
lineno += 1 | |
n = len(row_arr) | |
for i, v in enumerate(row_arr): | |
newv = sub_general(cursor, sub_special(cursor, table_name, v)) | |
sys.stdout.write(newv) | |
if i < n - 1: | |
sys.stdout.write(",") | |
sys.stdout.write("\n") | |
conn.commit() | |
committed = True | |
finally: | |
if not committed: | |
conn.rollback() | |
conn.close() | |
if errno > 0: | |
print >>sys.stderr, ("%d errors occured" % errno) | |
if __name__ == "__main__": | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"]) | |
except getopt.GetoptError as err: | |
# print help information and exit: | |
print(str(err)) # will print something like "option -a not recognized" | |
use() | |
sniff = False | |
for o, a in opts: | |
if o in ("-h", "--help"): | |
use() | |
elif o in ("--sniff-dialect"): | |
sniff = True | |
else: | |
assert False, "unexpected option" | |
if len(args) < 2: | |
use() | |
export(sniff, args[0], args[1], sys.stdin) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# Replace the value of a column, optionaly, using a regular expression. | |
# In case, the regular expression on the whole line is given, | |
# only the lines matching the expression are to be replaced. | |
# | |
# For instance, csvsub param 'N=([0-9]+);.*' '\1' | |
# | |
# Igor Konnov, 2013-2014 | |
import argparse | |
import csv | |
import getopt | |
import re | |
from sys import stdout | |
import sys | |
from util import * | |
class DataError(BaseException): | |
def __init__(self, msg): | |
BaseException.__init__(self, msg) | |
def use(): | |
print "Use: %s [--sniff-dialect] field field_regex replacement [line_regex] <csv" % sys.argv[0] | |
print "" | |
sys.exit(1) | |
def parse(args): | |
if len(args) < 3: | |
use() | |
field, regex, subst = args[0:3] | |
if len(args) > 3: | |
line_regex = args[3] | |
else: | |
line_regex = "" | |
return field, regex, subst, line_regex | |
def sub(sniff, field, regex, subst, line_regex, infile): | |
def sub_field(f, val): | |
if f == field: | |
return re.sub(regex, subst, val) | |
else: | |
return val | |
if sniff: | |
reader = SniffingCsvReader(infile) | |
else: | |
reader = csv.reader(infile) | |
writer = csv.writer(sys.stdout, dialect = reader.dialect, escapechar = '"') | |
fields = None | |
if line_regex != "": | |
lre = re.compile(line_regex) | |
else: | |
lre = None | |
for row_arr in reader: | |
out_row = row_arr | |
if not fields: | |
fields = row_arr # the first row is the header | |
else: | |
needs_replace = True | |
if lre: | |
if not lre.match(",".join(row_arr)): | |
needs_replace = False | |
if needs_replace: | |
for i, f in enumerate(fields): | |
out_row[i] = sub_field(f, row_arr[i]) | |
writer.writerow(out_row) | |
if __name__ == "__main__": | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "h", ["help", "sniff-dialect"]) | |
except getopt.GetoptError as err: | |
# print help information and exit: | |
print(str(err)) # will print something like "option -a not recognized" | |
use() | |
sniff = False | |
for o, a in opts: | |
if o in ("-h", "--help"): | |
use() | |
elif o in ("--sniff-dialect"): | |
sniff = True | |
else: | |
assert False, "unexpected option" | |
field, regex, subst, line_regex = parse(args) | |
sub(sniff, field, regex, subst, line_regex, sys.stdin) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# Replace the value of a column using python 'eval'. | |
# The current value of the column is passed into variable '_1'. | |
# The value of a column 'x' can be acccessed as r['x']. | |
# The script imports all functions from math, so you can use | |
# them with _1. | |
# | |
# Warning: this might be extremely insecure, as one can write | |
# virtually anything in 'eval'. Don't give root permissions to | |
# this script. | |
# | |
# Example: csvsubeval memory '([0-9]+)' '_1 / 1024' | |
# | |
# Igor Konnov, 2014 | |
import argparse | |
import csv | |
import getopt | |
import io | |
import math | |
import re | |
from sys import stdout | |
import sys | |
from util import * | |
def use(): | |
print("Use: {} [--sniff-dialect] [-i|--include-empty] field 'expr(_1)' <csv".format(sys.argv[0])) | |
print("") | |
print(" You can refer to the current version of the column as _1,") | |
print(" or to a value of a column x as r[x].") | |
print(" Note that empty fields are ignored by default.") | |
print("") | |
print(" When -i or --include-empty is set, empty fields are not ignored") | |
print("") | |
sys.exit(1) | |
def parse(): | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "hi", | |
["help", "include-empty", "sniff-dialect"]) | |
except getopt.GetoptError as err: | |
# print help information and exit: | |
print(str(err)) # will print something like "option -a not recognized" | |
use() | |
sys.exit(2) | |
include_empty = False | |
sniff = False | |
for o, a in opts: | |
if o in ("-h", "--help"): | |
use() | |
sys.exit() | |
elif o in ("-i", "--include-empty"): | |
include_empty = True | |
elif o in ("--sniff-dialect"): | |
sniff = True | |
else: | |
assert False, "unexpected option" | |
if len(args) < 2: | |
use() | |
return args[0], args[1], include_empty, sniff | |
def sub(sniff, field, replacement, include_empty, infile): | |
def sub_field(f, val, row): | |
if f == field and (include_empty or val.strip() != ''): | |
try: | |
loc = { "_1": val, "r": row } | |
glob = {} | |
for k, v in __builtins__.__dict__.iteritems(): | |
glob[k] = v | |
for k, v in math.__dict__.iteritems(): | |
glob[k] = v | |
return eval(replacement, glob, loc) | |
except ValueError as e: | |
print >>sys.stderr, "Error on %s=%s: %s" % (f, val, str(e)) | |
return val | |
else: | |
return val | |
def escape(s): | |
return s.replace('\\', '\\\\') | |
if sniff: | |
reader = SniffingCsvReader(infile) | |
else: | |
reader = csv.reader(infile) | |
writer = csv.writer(sys.stdout, dialect = reader.dialect, escapechar = '"') | |
fields = None | |
for row_arr in reader: | |
out_row = row_arr | |
if not fields: | |
fields = row_arr # the first row is the header | |
else: | |
row = {f: '"%s"' % escape(row_arr[i]) for i, f in enumerate(fields)} | |
for i, f in enumerate(fields): | |
out_row[i] = sub_field(f, row_arr[i], row) | |
writer.writerow(out_row) | |
if __name__ == "__main__": | |
field, replacement, include_empty, sniff = parse() | |
sub(sniff, field, replacement, include_empty, sys.stdin) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import io | |
import sys | |
class SniffingCsvReader: | |
def __init__(self, infile): | |
rawfile = io.open(infile.fileno(), mode='rb', buffering = 8192) | |
bufreader = io.BufferedReader(rawfile, buffer_size = 8192) | |
try: | |
self.dialect = csv.Sniffer().sniff(bufreader.peek(8192)) | |
except csv.Error as e: | |
print >>sys.stderr, "CSV dialect unknown: " + e.message | |
print >>sys.stderr, "Falling back to the default (excel)" | |
self.dialect = csv.get_dialect('excel') | |
self._reader = csv.reader(bufreader, dialect = self.dialect) | |
def __iter__(self): | |
return self | |
def next(self): | |
return self._reader.next() | |
def line_num(self): | |
return self._reader.line_num | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment