Created
April 22, 2014 19:19
-
-
Save malev/11191066 to your computer and use it in GitHub Desktop.
Filter Medicare dataset by state
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import csv | |
import glob | |
import optparse | |
class CSVHandler: | |
def __init__(self, filename): | |
self.filename = filename | |
self.rows = [] | |
self.titles = [] | |
def populate(self): | |
if len(self.rows) == 0: | |
with open(self.filename) as csvfile: | |
reader = csv.reader(csvfile) | |
for row in reader: | |
self.rows.append(row) | |
self.titles = self.rows.pop(0) | |
def find_column(self, values): | |
titles = self.get_row(0) | |
output = None | |
for idx, title in enumerate(titles): | |
for value in values: | |
if self.compare(value, title): | |
output = idx | |
break | |
if output is not None: | |
break | |
return output | |
def filter_col(self, col, value): | |
self.populate() | |
output = [] | |
for row in self.rows: | |
print row | |
if self.compare(row[col], value): | |
output.append(row) | |
return output | |
def compare(self, str1, str2): | |
return str1.lower().strip() == str2.lower().strip() | |
def store(self, rows): | |
with open(self.filename, 'a') as csvfile: | |
writer = csv.writer(csvfile) | |
for row in rows: | |
writer.writerow(row) | |
def add_row(self, row): | |
with open(self.filename, 'a') as csvfile: | |
writer = csv.writer(csvfile) | |
writer.writerow(row) | |
def get_row(self, idx): | |
output = [] | |
with open(self.filename, 'r') as csvfile: | |
reader = csv.reader(csvfile) | |
for x in range(idx + 1): | |
output = reader.next() | |
return output | |
class Cleaner: | |
def __init__(self, options): | |
options = vars(options) | |
self.from_dir = options['from'] | |
self.to_dir = options['to'] | |
self.state = options['state'] | |
self.check_output() | |
def check_output(self): | |
if not os.path.exists(self.to_dir): | |
os.makedirs(self.to_dir) | |
def find_files(self): | |
return [os.path.basename(filename) for filename in glob.glob(self.from_dir + '/*.csv')] | |
def call(self): | |
for filename in self.find_files(): | |
csv_input = CSVHandler(self.input_file(filename)) | |
column_idx = csv_input.find_column(['state', 'provider id']) | |
if column_idx != None: | |
state_rows = csv_input.filter_col(column_idx, self.state) | |
csv_output = CSVHandler(self.output_file(filename)) | |
csv_output.add_row(csv_input.titles) | |
csv_output.store(state_rows) | |
def input_file(self, filename): | |
return self.from_dir + "/" + filename | |
def output_file(self, filename): | |
return self.to_dir + "/" + filename | |
def main(): | |
p = optparse.OptionParser() | |
p.add_option('--from', '-f') | |
p.add_option('--to', '-t') | |
p.add_option('--state', '-s') | |
(options, args) = p.parse_args() | |
if None in vars(options).values(): | |
print "Super cool data cleaner" | |
p.print_help() | |
p.error('Options are mandatory') | |
print "Starting" | |
Cleaner(options).call() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment