Skip to content

Instantly share code, notes, and snippets.

@malev
Created April 22, 2014 19:19
Show Gist options
  • Save malev/11191066 to your computer and use it in GitHub Desktop.
Save malev/11191066 to your computer and use it in GitHub Desktop.
Filter Medicare dataset by state
#!/usr/bin/env python
import os
import csv
import glob
import optparse
class CSVHandler:
def __init__(self, filename):
self.filename = filename
self.rows = []
self.titles = []
def populate(self):
if len(self.rows) == 0:
with open(self.filename) as csvfile:
reader = csv.reader(csvfile)
for row in reader:
self.rows.append(row)
self.titles = self.rows.pop(0)
def find_column(self, values):
titles = self.get_row(0)
output = None
for idx, title in enumerate(titles):
for value in values:
if self.compare(value, title):
output = idx
break
if output is not None:
break
return output
def filter_col(self, col, value):
self.populate()
output = []
for row in self.rows:
print row
if self.compare(row[col], value):
output.append(row)
return output
def compare(self, str1, str2):
return str1.lower().strip() == str2.lower().strip()
def store(self, rows):
with open(self.filename, 'a') as csvfile:
writer = csv.writer(csvfile)
for row in rows:
writer.writerow(row)
def add_row(self, row):
with open(self.filename, 'a') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(row)
def get_row(self, idx):
output = []
with open(self.filename, 'r') as csvfile:
reader = csv.reader(csvfile)
for x in range(idx + 1):
output = reader.next()
return output
class Cleaner:
def __init__(self, options):
options = vars(options)
self.from_dir = options['from']
self.to_dir = options['to']
self.state = options['state']
self.check_output()
def check_output(self):
if not os.path.exists(self.to_dir):
os.makedirs(self.to_dir)
def find_files(self):
return [os.path.basename(filename) for filename in glob.glob(self.from_dir + '/*.csv')]
def call(self):
for filename in self.find_files():
csv_input = CSVHandler(self.input_file(filename))
column_idx = csv_input.find_column(['state', 'provider id'])
if column_idx != None:
state_rows = csv_input.filter_col(column_idx, self.state)
csv_output = CSVHandler(self.output_file(filename))
csv_output.add_row(csv_input.titles)
csv_output.store(state_rows)
def input_file(self, filename):
return self.from_dir + "/" + filename
def output_file(self, filename):
return self.to_dir + "/" + filename
def main():
p = optparse.OptionParser()
p.add_option('--from', '-f')
p.add_option('--to', '-t')
p.add_option('--state', '-s')
(options, args) = p.parse_args()
if None in vars(options).values():
print "Super cool data cleaner"
p.print_help()
p.error('Options are mandatory')
print "Starting"
Cleaner(options).call()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment