Skip to content

Instantly share code, notes, and snippets.

@malev
Last active August 29, 2015 14:00
Show Gist options
  • Save malev/11010799 to your computer and use it in GitHub Desktop.
Save malev/11010799 to your computer and use it in GitHub Desktop.
Coreferencer
class Answer(object):
"""docstring for answer
>>> answer = Answer(1, [1,3])
>>> answer.included()
True
>>> answer.excluded()
False
>>> answer.includes()
[1, 3]
>>> answer = Answer(2, [1, 2, 3])
>>> answer.includes()
[2, 1, 3]
"""
def __init__(self, ans, indices,
klass='include', new_row={}):
self.klass = klass
self.ans = ans
self.indices = indices
self.new_row = new_row
self.order()
def order(self):
if type(self.ans) is int:
self.indices.remove(self.ans)
self.indices.insert(0, self.ans)
else:
pass
def included(self):
return self.klass == 'include'
def excluded(self):
return not self.included()
def new_row(self):
return self.new_row
def includes(self):
return self.indices
def excludes(self):
return self.indices
if __name__ == '__main__':
import doctest
doctest.testmod()
import csv
class CSVHandler(object):
"""
>>> csv_handler = CSVHandler()
>>> csv_handler[1806][0]
1806
>>> csv_handler.addresses[0]
[1806, 2376]
"""
def __init__(self):
self.csv_file = 'file_full.csv'
self.id_index = 0
self.rows = []
self.addresses = []
self.populate()
def populate(self):
with open(self.csv_file, 'r') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
try:
row[0] = int(row[0])
except ValueError:
pass
self.rows.append(row)
self.rows.pop(0)
def __getitem__(self, index):
return next((row for row in self.rows if row[0] == index))
def find_dup(self):
output = []
for row in self.rows:
if row[0] in sum(output, []):
continue
self.addresses.append(self.search_similars(row[9]))
def search_similars(self, address):
return [row[0] for row in self.rows
if self.similar(address, row[9])]
def similar(self, str1, str2):
return self.clean(str1) == self.clean(str2)
def clean(self, cad):
return cad.lower().strip()
if __name__ == '__main__':
import doctest
doctest.testmod()
import csv
import time
from storer import Storer
from csv_handler import CSVHandler
class Exporter:
def __init__(self):
self.include_csv = 'include.csv'
self.exclude_csv = 'exclude.csv'
self.new_rows_csv = 'new_rows.csv'
self.filename = self.set_filename()
self.storer = Storer()
self.csv_handler = CSVHandler()
def call(self):
self.write(self.add_includes())
self.write(self.add_new_rows())
print "Exported in %s" % self.filename
def write(self, rows):
with open(self.filename, 'a') as csvfile:
writer = csv.writer(csvfile)
for row in rows:
print row
writer.writerow(row)
def add_includes(self):
output = []
for included in self.storer.included():
row = self.csv_handler[included[0]][:]
row.append(len(included))
output.append(row)
return output
def add_new_rows(self):
output = []
with open(self.new_rows_csv, 'r') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
output.append(self.process(row))
return output
def process(self, row):
excluded = eval(row[-1])
row[-1] = len(excluded)
return row
def set_filename(self):
return str(int(time.time())) + '.csv'
if __name__ == "__main__":
Exporter().call()
from csv_handler import CSVHandler
from storer import Storer
from question import Question
class Interpreter:
"""
>>> from answer import Answer
>>> answer = Answer('n', [1806, 1031], 'exclude', {'member_of': "Rolling Stones"})
>>> interpreter = Interpreter()
>>> interpreter.build_row(answer)[8]
'Rolling Stones'
"""
def __init__(self):
self.csv_handler = CSVHandler()
self.storer = Storer()
def call(self):
self.csv_handler.find_dup()
for address in self.csv_handler.addresses:
if len(address) == 1:
if not self.storer.is_included(address):
self.storer.include(address)
else:
question = Question(address)
if not question.already_asked():
self.store(question.ask())
print "You are done"
def store(self, answer):
if answer.included():
self.storer.include(answer.includes())
elif answer.excluded():
self.storer.new_row(answer.excludes(), self.build_row(answer))
def build_row(self, answer):
index = answer.indices[0]
row = self.csv_handler[index]
row[8] = answer.new_row['member_of']
return row
if __name__ == '__main__':
Interpreter().call()
from answer import Answer
from storer import Storer
from csv_handler import CSVHandler
class NewRow:
"""
"""
def __init__(self, indices):
self.indices = indices
self.storer = Storer()
self.csv_handler= CSVHandler()
self.rows = {}
self.populate()
def ask(self):
print 10 * '*'
for index in self.indices:
print self.row_label(index)
ans = raw_input("Type a member of: ")
return Answer('n',
self.indices,
'exclude',
{'member_of': ans})
def populate(self):
for index in self.indices:
self.rows[index] = self.csv_handler[index]
def row_label(self, index):
row = self.rows[index]
return """
FID: %(fid)s | Full name: %(full_name)s
DBS: %(dbs)s | Provider: %(provider)s
Member of: %(member)s
Address: %(address)s
""" % {'fid': row[0],
'full_name': self.full_name(index),
'dbs': row[5],
'provider': row[6],
'member': row[8],
'address': row[9]
}
def full_name(self, index):
row = self.rows[index]
return "%s %s" % (row[2], row[4])
def showable_columns(self):
return {
0: 'FID',
2: 'First name',
4: 'Last name',
5: 'DBS',
6: 'Provider',
8: 'Member of',
9: 'Address'
}
if __name__ == '__main__':
import doctest
doctest.testmod()
from storer import Storer
from csv_handler import CSVHandler
from answer import Answer
from new_row import NewRow
class Question(object):
"""
>>> storer = Storer()
>>> storer.include([1806, 1031])
[1806, 1031]
>>> question = Question([1806, 1031])
>>> question.already_asked()
True
>>> question = Question([1253, 729])
>>> question.already_asked()
False
#>>> question = Question([1806, 1031])
#>>> print question.ask()
"""
def __init__(self, indices):
self.indices = indices
self.storer = Storer()
self.csv_handler= CSVHandler()
self.rows = {}
self.populate()
def showable_columns(self):
return {
0: 'FID',
2: 'First name',
4: 'Last name',
5: 'DBS',
6: 'Provider',
8: 'Member of',
9: 'Address'
}
def ask(self):
ans = 'invalid'
while not self.valid(ans):
print "*" * 10
for index in self.indices:
print self.row_label(index)
ans = raw_input("Select a FID or 'n': ").lower()
if ans == 'n':
ans = NewRow(self.indices).ask()
else:
ans = Answer(int(ans), self.indices)
return ans
def valid(self, ans):
if ans == 'n':
True
else:
try:
ans = int(ans)
except ValueError:
self.show_error()
return False
if ans not in self.indices:
self.show_error()
return False
return True
def show_error(self):
print "Error, please retry!"
def row_label(self, index):
label = ""
for k, v in self.showable_columns().iteritems():
label += "%s: %s\n" % (v, self.rows[index][k])
return label
def populate(self):
for index in self.indices:
self.rows[index] = self.csv_handler[index]
def already_asked(self):
indices = self.indices
return (self.storer.is_included(indices) or
self.storer.is_excluded(indices))
if __name__ == '__main__':
import doctest
doctest.testmod()
import csv
class Storer(object):
"""docstring for storer
>>> storer = Storer()
>>> storer.include([1,2,3])
[1, 2, 3]
>>> storer.exclude([4, 5])
[4, 5]
>>> storer.new_row([6,7], {'title': 'title'})
[6, 7, "{'title': 'title'}"]
>>> storer.included()
[[1, 2, 3]]
>>> storer.excluded()
[[4, 5], [6, 7]]
>>> storer.is_included([2,1,3])
True
>>> storer.is_included([9,2,3])
False
>>> storer.is_excluded([6,7])
True
>>> storer.new_row([1,2,3,"rolling stones"])
[1, 2, 3, "rolling stones"]
"""
def __init__(self):
self.include_csv = 'include.csv'
self.exclude_csv = 'exclude.csv'
self.new_rows_csv = 'new_rows.csv'
def include(self, ids):
self.write(self.include_csv, ids)
return ids
def exclude(self, ids):
self.write(self.exclude_csv, ids)
return ids
def new_row(self, ids, info):
new_row = info + [ids]
self.write(self.exclude_csv, ids)
self.write(self.new_rows_csv, new_row)
return new_row
def included(self):
return self.read(self.include_csv)
def excluded(self):
return self.read(self.exclude_csv)
def is_included(self, ids):
for group in self.included():
if all(id in group for id in ids):
return True
return False
def is_excluded(self, ids):
for group in self.excluded():
if all(id in group for id in ids):
return True
return False
def read(self, filename):
output = []
try:
with open(filename, 'r') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
output.append([int(index) for index in row])
except IOError:
pass
return output
def write(self, filename, ids):
with open(filename, 'a') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(ids)
if __name__ == '__main__':
import doctest
doctest.testmod()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment