Created
June 17, 2018 21:00
-
-
Save robla/7664d03372e6a80f1372869c09472b60 to your computer and use it in GitHub Desktop.
sfballotparse.py - parse sfelections.sfgov.org/results ballot image files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# MIT License | |
# | |
# Copyright (c) 2018 Rob Lanphier | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
""" | |
Parse the ballot image files provided by the City of San Francisco: | |
https://sfelections.sfgov.org/results | |
""" | |
BALLOTIMAGE_FIELDSPEC = """fieldname,lastchar,length | |
Contest_Id,7,7 | |
Pref_Voter_Id,16,9 | |
Serial_Number,23,7 | |
Tally_Type_Id,26,3 | |
Precinct_Id,33,7 | |
Vote_Rank,36,3 | |
Candidate_Id,43,7 | |
Over_Vote,44,1 | |
Under_Vote,45,1 | |
""" | |
MASTERLOOKUP_FIELDSPEC = """fieldname,lastchar,length | |
Record_Type,10,10 | |
Id,17,7 | |
Description,67,50 | |
List_Order,74,7 | |
Candidates_Contest_Id,81,7 | |
Is_WriteIn,82,1 | |
Is_Provisional,83,1 | |
""" | |
import argparse | |
from collections import OrderedDict | |
from copy import deepcopy | |
import csv | |
import fileinput | |
from io import StringIO | |
import json | |
import re | |
import sys | |
import urllib.parse | |
def read_fieldspec(fieldspec_string): | |
""" | |
The image files and the master lookup (among others) provided in | |
SF elections are fixed-width text fields. This function: | |
1. ...reads the CSV file that specifies field names and the | |
position of the last character of each field. | |
2. ...build a regex which can be used to read the fixed width | |
fields | |
""" | |
# 1. read the csv spec into a list of dicts | |
list_of_rows = [] | |
csvfile = StringIO(fieldspec_string) | |
reader = csv.DictReader(csvfile) | |
for row in reader: | |
list_of_rows.append(row) | |
# 2. build regex from list_of_rows | |
regex = r'^' | |
fieldnames = [] | |
pos = 0 | |
for field in list_of_rows: | |
lastchar = field['lastchar'] | |
regex += r'(.{' | |
regex += str(int(lastchar) - pos) | |
regex += r'})' | |
pos = int(lastchar) | |
fieldnames.append(field['fieldname']) | |
if (int(lastchar) - pos) == int(field['length']): | |
raise ValueError( | |
"Length mismatch in {1}".format(field['fieldname'])) | |
return(regex, fieldnames) | |
def read_data_file(fieldspec, datafile): | |
""" | |
This function uses a regex (created in read_fieldspec) to convert | |
each line of an SF election ballot image file into a Python | |
OrderedDict suitable for output as JSON or YAML. | |
""" | |
(regex, fields) = read_fieldspec(fieldspec) | |
for line in fileinput.input(datafile): | |
regmatch = re.match(regex, line) | |
if regmatch: | |
rowdict = OrderedDict() | |
for i, field in enumerate(fields): | |
rowdict[field] = regmatch.group(i + 1) | |
yield(rowdict) | |
else: | |
raise ValueError('generated regex does not match datafile') | |
def convert_to_ballots(imagelines, lookuplines, contestid=None): | |
""" | |
Each line of the ballot image file contains just one of many | |
possible candidate preferences expressed on a given ballot. For | |
example, in the 2018 SF Mayoral race, voters could choose up to 3 | |
preferences for mayor. Each preference expressed would have its own | |
line in the image file. This function aggregates all of the | |
preferences expressed on a ballot into a single hierarchical data | |
structure, with one set of ballotfields per ballot, and many sets | |
of votefields (one set per candidate chosen) | |
""" | |
ballotfields = [ | |
'Contest_Id', | |
'Pref_Voter_Id', | |
'Serial_Number', | |
'Tally_Type_Id', | |
'Precinct_Id' | |
] | |
# build up dict to look up candidate names from id | |
candpool = OrderedDict() | |
candpool['0000000'] = None | |
for lookupline in lookuplines: | |
if lookupline['Record_Type'].strip() == 'Candidate': | |
candpool[lookupline['Id']] = \ | |
lookupline['Description'].strip() | |
if lookupline['Record_Type'].strip() == 'Contest': | |
# default contestid will be the first one listed | |
if not contestid: | |
contestid = lookupline['Id'] | |
# create an empty ballot with proper field order to deepcopy when | |
# needed | |
emptyballot = OrderedDict() | |
for field in ballotfields: | |
emptyballot[field] = None | |
thisballot = deepcopy(emptyballot) | |
lastballot = thisballot | |
for imageline in imagelines: | |
# skip over all imagelines that aren't associated with the | |
# contestid passed in | |
if not imageline['Contest_Id'] == contestid: | |
continue | |
# each ballot may result in 3 image lines (one for each | |
# preference the voter marks). See if this line is the same | |
# voter/ballot as the previous line | |
if(thisballot['Pref_Voter_Id'] != imageline['Pref_Voter_Id']): | |
# if the Prev_Voter_Id doesn't line up, that means we're | |
# done with a ballot. yield it from this function, then | |
# start building a new ballot from this line. | |
if thisballot['Pref_Voter_Id'] != None: | |
yield(thisballot) | |
lastballot = thisballot | |
thisballot = deepcopy(emptyballot) | |
for field in ballotfields: | |
thisballot[field] = imageline[field] | |
thisballot['votes'] = [] | |
# store the preference associated with this imageline in | |
# "thisvote" | |
thisvote = OrderedDict() | |
thisvote['rank'] = int(imageline['Vote_Rank']) | |
thisvote['candidate'] = candpool[imageline['Candidate_Id']] | |
overvote = (imageline['Over_Vote'] == '1') | |
undervote = (imageline['Under_Vote'] == '1') | |
if(overvote and undervote): | |
raise ValueError('both overvote and undervote flagged') | |
elif(overvote): | |
thisvote['exception'] = 'overvote' | |
elif(undervote): | |
thisvote['exception'] = 'undervote' | |
thisballot['votes'].append(thisvote) | |
# now that we're out of the loop, yield the last ballot | |
yield(thisballot) | |
def dump_url_encoded(outputrecords, outfh): | |
for rec in outputrecords: | |
outrec = {} | |
# populate the higher ranked duplicates take priority over | |
# the lower rank | |
outrec[rec['votes'][0]['candidate']] = rec['votes'][0]['rank'] | |
if not rec['votes'][1]['candidate'] in outrec: | |
outrec[rec['votes'][1]['candidate']] = rec['votes'][1]['rank'] | |
if not rec['votes'][2]['candidate'] in outrec: | |
outrec[rec['votes'][2]['candidate']] = rec['votes'][2]['rank'] | |
print(urllib.parse.urlencode(outrec), file=outfh) | |
def main(argv=None): | |
# using splitlines to just get the first line | |
parser = argparse.ArgumentParser(description=__doc__.splitlines()[1]) | |
parser.add_argument('--imagelines', | |
help='print records for imagelines', | |
action="store_true") | |
parser.add_argument('lookupfile', | |
help='master lookup file for this election') | |
parser.add_argument('imagefile', help='ballot image file') | |
parser.add_argument('-o', '--outfile', help='output file', | |
default=None) | |
parser.add_argument('--contestid', help='contest id; defaults to first found', | |
default=None) | |
parser.add_argument('--outputformat', | |
help='output format: json (default), urlencoded' | |
' (for Brian Olson\'s voteutil)', default="json") | |
args = parser.parse_args() | |
lookuplines = list(read_data_file( | |
MASTERLOOKUP_FIELDSPEC, args.lookupfile)) | |
imagelines = read_data_file(BALLOTIMAGE_FIELDSPEC, args.imagefile) | |
if(args.imagelines): | |
# TODO: filter imagelines by args.contestid | |
outputrecords = imagelines | |
else: | |
outputrecords = convert_to_ballots(imagelines, lookuplines, | |
contestid=args.contestid) | |
if args.outfile: | |
outfh = open(args.outfile, 'w') | |
else: | |
outfh = sys.stdout | |
if args.outputformat == 'json': | |
try: | |
json.dump(outputrecords, outfh, indent=4) | |
except TypeError: | |
# convert generator to list | |
json.dump(list(outputrecords), outfh, indent=4) | |
elif args.outputformat == 'urlencoded': | |
dump_url_encoded(outputrecords, outfh) | |
else: | |
raise ValueError( | |
'args.outputformat {} not recognized'.format(args.outputformat)) | |
if __name__ == '__main__': | |
exit_status = main(sys.argv) | |
sys.exit(exit_status) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment