Created
October 13, 2020 16:49
-
-
Save pjhoberman/10a7dd03eb8627e4c6ab8a2452df5ca1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import datetime | |
location_tmpl = { | |
'COUNTY': None, | |
'NAME': None, | |
'DIRECTIONS': None, | |
'IS_DROP_BOX': None, | |
'IS_EARLY_VOTING': None, | |
'ADDRESS LINE 1': None, | |
'ADDRESS LINE 2': None, | |
'CITY': None, | |
'STATE': None, | |
'ZIP': None, | |
'START TIME': None, | |
'END TIME': None, | |
'START DATE': None, | |
'END DATE': None, | |
} | |
keys = ['COUNTY', 'NAME', 'DIRECTIONS', 'IS_DROP_BOX', 'IS_EARLY_VOTING', 'ADDRESS LINE 1', 'ADDRESS LINE 2', 'CITY', 'STATE', 'ZIP', 'START TIME', 'END TIME', 'START DATE', 'END DATE'] | |
locations = [] | |
with open('co.csv', 'r') as file: | |
reader = csv.reader(file) | |
previous_address = None | |
for i, line in enumerate(reader): | |
# print(i, line) | |
if i == 0: | |
continue | |
# print(line) | |
# print(f"{i} - {previous_address}") | |
location = dict(zip(keys, line)) | |
start_time = datetime.datetime.strptime( | |
f"{location['START DATE']} {location['START TIME']}", | |
'%m/%d/%Y %I:%M %p' | |
) | |
end_time = datetime.datetime.strptime( | |
f"{location['END DATE']} {location['END TIME']}", | |
'%m/%d/%Y %I:%M %p') | |
location['times'] = [{ | |
'drop_box': location['IS_DROP_BOX'], | |
'early_voting': location['IS_EARLY_VOTING'], | |
'start_time': location['START TIME'], | |
'end_time': location['END TIME'], | |
'start_date': location['START DATE'], | |
'end_date': location['END DATE'], | |
'start_datetime': start_time, | |
'end_datetime': end_time, | |
}] | |
# see if this location already exists | |
location_test = [idx for idx, l in enumerate(locations) if | |
l.get('COUNTY', None) == location.get('COUNTY', None) and | |
l.get('ADDRESS LINE 1', None) == location.get('ADDRESS LINE 1', None)] | |
if location_test: | |
locations[location_test[0]]['times'].append({ | |
'drop_box': location['IS_DROP_BOX'], | |
'early_voting': location['IS_EARLY_VOTING'], | |
'start_time': location['START TIME'], | |
'end_time': location['END TIME'], | |
'start_date': location['START DATE'], | |
'end_date': location['END DATE'], | |
'start_datetime': start_time, | |
'end_datetime': end_time, | |
}) | |
else: | |
locations.append(location) | |
def next_day(dt1, dt2): | |
""" | |
:param dt1: datetime | |
:param dt2: datetime | |
:return: boolean | |
""" | |
return dt2.day == (dt1 + datetime.timedelta(days=1)).day | |
def actual_times(times): | |
if not times: | |
return False, [] | |
previous_start_time = None | |
previous_end_time = None | |
previous_datetime = None | |
start_date = None | |
end_date = None | |
actuals = [] | |
date_check = [] | |
issue = False | |
# todo: check for skipped days | |
for time in times: | |
if not previous_start_time: | |
previous_start_time = time.get('start_time') | |
previous_end_time = time.get('end_time') | |
previous_datetime = time.get('end_datetime') | |
start_date = time.get('start_date') | |
end_date = time.get('end_date') | |
elif time.get('start_time') != previous_start_time or time.get('end_time') != previous_end_time or not next_day(previous_datetime, time.get('end_datetime')): | |
if start_date == end_date: | |
actuals.append(f"{start_date}: {previous_start_time} - {previous_end_time}") | |
date_check.append(start_date) | |
else: | |
actuals.append(f"{start_date} - {end_date}: {previous_start_time} - {previous_end_time}") | |
date_check.append(start_date) | |
previous_start_time = time.get('start_time') | |
previous_end_time = time.get('end_time') | |
previous_datetime = time.get('end_datetime') | |
start_date = time.get('start_date') | |
end_date = time.get('end_date') | |
else: | |
end_date = time.get('end_date') | |
previous_datetime = time.get('end_datetime') | |
# capture last time | |
if time.get('start_time') != previous_start_time or time.get('end_time') != previous_end_time: | |
if time.get('start_date') == time.get('end_date'): | |
actuals.append(f"{time.get('start_date')}: {time.get('start_time')} - {time.get('end_time')}") | |
date_check.append(start_date) | |
else: | |
actuals.append(f"{time.get('start_date')} - {time.get('end_date')}: {time.get('start_time')} - {time.get('end_time')}") | |
date_check.append(start_date) | |
# and if the last time matches | |
else: | |
if start_date == time.get('end_date'): | |
actuals.append(f"{start_date}: {time.get('start_time')} - {time.get('end_time')}") | |
date_check.append(start_date) | |
else: | |
actuals.append(f"{start_date} - {time.get('end_date')}: {time.get('start_time')} - {time.get('end_time')}") | |
date_check.append(start_date) | |
if len(date_check) > len(set(date_check)): | |
issue = True | |
return issue, actuals | |
for i, location in enumerate(locations): | |
drop_box = [] | |
drop_box_serviced = [] | |
early = [] | |
nov3 = [] | |
for time in location.get('times', None): | |
if time.get('drop_box', None) is True or time.get('drop_box', None) == "TRUE": | |
drop_box.append(time) | |
elif time.get('early_voting', None) is True or time.get('early_voting', None) == "TRUE": | |
early.append(time) | |
else: | |
nov3.append(time) | |
drop_box = sorted(drop_box, key=lambda t: t.get('start_datetime')) | |
early = sorted(early, key=lambda t: t.get('start_datetime')) | |
nov3 = sorted(nov3, key=lambda t: t.get('start_datetime')) | |
issue_drop, drop_box = actual_times(drop_box) | |
issue_early, early = actual_times(early) | |
issue_nov, nov3 = actual_times(nov3) | |
if any([issue_nov, issue_drop, issue_early]): | |
print(location['COUNTY'], location['NAME']) | |
location['drop_box_hours'] = "\n".join(drop_box) | |
# f"{time.get('start_date', None)} - {time.get('end_date', None)}: {time.get('start_time', None)} - {time.get('end_time', None)}" | |
# for time in drop_box]) | |
location['early_voting_hours'] = "\n".join(early) | |
# f"{time.get('start_date', None)} - {time.get('end_date', None)}: {time.get('start_time', None)} - {time.get('end_time', None)}" | |
# for time in early]) | |
location['elect_day_hours'] = "\n".join(nov3) | |
# f"{time.get('start_date', None)} - {time.get('end_date', None)}: {time.get('start_time', None)} - {time.get('end_time', None)}" | |
# for time in nov3]) | |
location['all_hours'] = "" | |
if location['drop_box_hours']: | |
location['all_hours'] += "Drop Box Hours:\n" + location['drop_box_hours'] + "\n\n" | |
if location['early_voting_hours']: | |
location['all_hours'] += "Early Voting Hours:\n" + location['early_voting_hours'] + "\n\n" | |
if location['elect_day_hours']: | |
location['all_hours'] += "Election Day Hours:\n" + location['elect_day_hours'] + "\n\n" | |
# cleanup | |
for location in locations: | |
if location.get('DIRECTIONS', None): | |
location['NAME'] += f", {location['DIRECTIONS']}" | |
if location.get('ADDRESS LINE 2', None): | |
location['ADDRESS LINE 1'] += f", {location['ADDRESS LINE 2']}" | |
for key in ['DIRECTIONS', 'ADDRESS LINE 2', 'IS_DROP_BOX', 'IS_EARLY_VOTING', | |
'START TIME', 'END TIME', 'START DATE', 'END DATE', | |
'times', 'drop_box_hours', 'early_voting_hours', 'elect_day_hours']: | |
location.pop(key, None) | |
with open('co_out.csv', 'w', newline='') as output_file: | |
dict_writer = csv.DictWriter(output_file, locations[-1].keys()) | |
dict_writer.writeheader() | |
dict_writer.writerows(locations) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment