Skip to content

Instantly share code, notes, and snippets.

@angelialau
Created June 5, 2020 14:01
Show Gist options
  • Save angelialau/008ec2fb4d3bfeef70123212ba47d2ce to your computer and use it in GitHub Desktop.
Save angelialau/008ec2fb4d3bfeef70123212ba47d2ce to your computer and use it in GitHub Desktop.
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
import sys
# constants
VIOLATIONS_FILEPATH = 'hdfs:///tmp/bdm/nyc_parking_violation/*'
CSCL_FILEPATH = 'hdfs:///tmp/bdm/nyc_cscl.csv'
YEARS = list(range(2015, 2020))
COUNTY_MAP = {'MAN': '1','MH': '1','MN': '1','NEWY': '1','NEW Y': '1','NY': '1',
'BRONX': '2', 'BX': '2',
'BK': '3', 'K': '3', 'KING': '3', 'KINGS': '3',
'Q': '4', 'QN': '4', 'QNS': '4', 'QU': '4', 'QUEEN': '4',
'R': '5', 'RICHMOND': '5'}
HSE_COLS = ['L_LOW_HN', 'L_HIGH_HN', 'R_LOW_HN', 'R_HIGH_HN']
CSCL_COLS = ['PHYSICALID', 'BOROCODE', 'FULL_STREE', 'ST_LABEL'] + HSE_COLS
# get lin reg coefficients using statsmodels
def get_coef(y1, y2, y3, y4, y5):
import numpy as np
import statsmodels.api as sm
x = np.array(YEARS)
X = sm.add_constant(x)
y = np.array([y1,y2,y3,y4,y5])
return float(sm.OLS(y,X).fit().params[1])
if __name__=='__main__':
sc = SparkContext()
spark = SparkSession(sc)
############### 1. Preparing violations data
# reading violations data
vio = spark.read.csv(VIOLATIONS_FILEPATH, header=True)\
.select('Issue date', 'Street Name', 'House Number', 'Violation County')\
.dropna(how='any').cache()
# condition for year 2015 - 2019
vio = vio.withColumn('year', F.to_date(vio['Issue date'], 'mm/dd/yyyy'))
vio = vio.withColumn('year', F.year(vio.year))
cond_year = vio.year.isin(YEARS)
# condition for hse numbers: (#), (# #), (#-#)
cond_hn = vio['House Number'].rlike('^[0-9]+([ -][0-9]+)?$')
# remove those w invalid years or hse numbers
vio = vio.filter(cond_year & cond_hn)
# rename columns
vio = vio.select('year',
F.col('Violation County').alias('county'),
F.col('Street Name').alias('st_name'),
F.col('House Number').alias('hse_num'))
# uppercase street names
vio = vio.withColumn('st_name', F.upper(vio.st_name))
# replace county with boros
vio = vio.replace(COUNTY_MAP, subset='county')
# split house number by space or hyphen
vio = vio.withColumn('hse_num', F.split(vio.hse_num, ' |-'))
# get total counts by location and year
vio = vio.groupby(vio.columns).count()
# pivot by year, to reduce number of rows
vio = vio.groupby('county', 'st_name', 'hse_num').pivot('year', YEARS).agg(F.max('count'))
# type cast arraytype to int
vio = vio.withColumn('hse_num', F.expr('transform(hse_num, x-> int(x))'))
############### 2. Preparing centerline data
# read file
cscl = spark.read.csv(CSCL_FILEPATH, header=True).select(CSCL_COLS).cache()
cscl = cscl.withColumn('PHYSICALID', cscl.PHYSICALID.astype('int'))
for hse_col in HSE_COLS:
# split by delimiter
cscl = cscl.withColumn(hse_col, F.split(hse_col, ' |-'))
# type cast HN to int
cscl = cscl.withColumn(hse_col, F.expr('transform(%s, x-> int(x))'%hse_col))
# consolidate FULL_STREE and ST_LABEL into single column
cscl = cscl.groupby('PHYSICALID')\
.agg(F.array_join(F.collect_set('FULL_STREE'), '_').alias('full_st'),
F.array_join(F.collect_set('ST_LABEL'), '_').alias('st_label'),
F.first('L_LOW_HN').alias('L_LOW_HN'),
F.first('L_HIGH_HN').alias('L_HIGH_HN'),
F.first('R_LOW_HN').alias('R_LOW_HN'),
F.first('R_HIGH_HN').alias('R_HIGH_HN'),
F.first('BOROCODE').alias('BOROCODE'),
)
cscl = cscl.withColumn('st', F.array_distinct(F.split(F.concat_ws('_', 'full_st', 'st_label'), '_')))
############### 3. Join violations to centerline
# condition for boro
cond_boro = (vio.county == cscl.BOROCODE)
# condition for street name
cond_st = F.expr('array_contains(st, st_name)')
# condition for house number
subcond_even = ((F.element_at(vio.hse_num, -1)%2==0)
& (F.size(vio.hse_num) == F.size(cscl.R_LOW_HN))
& (F.element_at(vio.hse_num, -1) >= F.element_at(cscl.R_LOW_HN, -1))
& (F.element_at(vio.hse_num, -1) <= F.element_at(cscl.R_HIGH_HN, -1))
& (((F.size(vio.hse_num) == 2) & (vio.hse_num[0] == cscl.R_LOW_HN[0]))
| (F.size(vio.hse_num) == 1))
)
subcond_odd = ((F.element_at(vio.hse_num, -1)%2!=0)
& (F.size(vio.hse_num) == F.size(cscl.L_LOW_HN))
& (F.element_at(vio.hse_num, -1) >= F.element_at(cscl.L_LOW_HN, -1))
& (F.element_at(vio.hse_num, -1) <= F.element_at(cscl.L_HIGH_HN, -1))
& (((F.size(vio.hse_num) == 2) & (vio.hse_num[0] == cscl.L_LOW_HN[0]))
| (F.size(vio.hse_num) == 1))
)
cond_hn = (subcond_even | subcond_odd)
# actual join
joined = vio.join(F.broadcast(cscl), [cond_boro, cond_hn, cond_st])\
.select('PHYSICALID', '2015', '2016', '2017', '2018', '2019')\
.fillna(0).cache()
# aggregate counts by phy id
joined = joined.groupby('PHYSICALID')\
.agg(F.sum('2015').alias('2015'),
F.sum('2016').alias('2016'),
F.sum('2017').alias('2017'),
F.sum('2018').alias('2018'),
F.sum('2019').alias('2019'))
# union with distinct phy ids to recover phyids with no violations
distinct_cscl = cscl.select('PHYSICALID')\
.distinct().alias('distinct_ids')\
.cache()
distinct_cscl = distinct_cscl.withColumn('2015', F.lit(0))\
.withColumn('2016', F.lit(0))\
.withColumn('2017', F.lit(0))\
.withColumn('2018', F.lit(0))\
.withColumn('2019', F.lit(0))
joined = joined.union(distinct_cscl)
joined = joined.groupby('PHYSICALID')\
.agg(F.max('2015').alias('2015'),
F.max('2016').alias('2016'),
F.max('2017').alias('2017'),
F.max('2018').alias('2018'),
F.max('2019').alias('2019'))
############### 4. Linear regression
get_coef_udf = F.udf(get_coef, FloatType())
joined = joined.withColumn('coef', get_coef_udf(joined['2015'],
joined['2016'],
joined['2017'],
joined['2018'],
joined['2019']))
out_folder = sys.argv[1]
joined.orderBy('PHYSICALID').write.csv(out_folder, mode='overwrite')
@angelialau
Copy link
Author

A big data pySpark program that joins NYC Parking Violations Issued - Fiscal Year 2015-2019 data (~10GB) to NYC Street Centerline data (650k street segments). It calculates the total number of parking violations per street segment and computes the rate of change over the years using Ordinary Least Squares. The program runs within 13mins, and the brief approach is as such:

Summary of Approach

  1. Preprocessing the violations and centerline data. I prioritized reducing the number of rows each dataset would have.
    • Violations data
      1. Keepings records from years 2015-2019
      2. Filtering house numbers by regex, keeping only those of patterns:
        • number
        • number-space-number
        • number-hypen-number
      3. Splitting house numbers by either space or hyphen
      4. Upper case the street names
      5. Replacing violation county with respective borough codes
      6. Groupby borocode, street names, house number columns -> get counts
      7. Pivot by year so that each year has a column
    • Centerline data
      1. Split each of the house number columns by either space or hyphen
      2. Groupby PHYSICALID --> combined the FULL_STREE and ST_LABEL columns into a single column containing the set of unique values - called 'st'
  2. (Inner) Joining the two dataframes together, broadcasting centerline data, using these conditions:
    1. Borocodes are the same
    2. Violation street name is inside the set of unique street names associated with the PHYSICALID
    3. House numbers match depending on whether it's odd (checks L) or even (checks R), and whether they originally had hyphens/spaces (I checked the length of array)
  3. Groupby PHYSICALID, summing the counts for the respective years
  4. Since the previous join was an inner join, centerline data without any violations were lost. So I created a new dataframe containing the unique PHYSICALIDs. I unioned my joined dataframe and this unique-PHYSICALIDs dataframe, taking the max values for each year.
  5. Performed linear regression using udf and statsmodels.api.OLS
  6. Order by PHYSICALID
  7. Save to csv

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment