Skip to content

Instantly share code, notes, and snippets.

@BioSciEconomist
Last active March 1, 2021 18:47
Show Gist options
  • Save BioSciEconomist/2f5264781b90b224f07b903bdccdfa17 to your computer and use it in GitHub Desktop.
Save BioSciEconomist/2f5264781b90b224f07b903bdccdfa17 to your computer and use it in GitHub Desktop.
Example of regression discontinuity design in python (compared to R)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Feb 26 20:16:01 2021
@author: mattbogard
"""
#*-----------------------------------------------------------------
# | PROGRAM NAME: ex RDD2.py
# | DATE: 2/26/21
# | CREATED BY: MATT BOGARD
# | PROJECT FILE:
# *----------------------------------------------------------------
# | PURPOSE: example code for RDD
# *----------------------------------------------------------------
# code based on: https://matheusfacure.github.io/python-causality-handbook/16-Regression-Discontinuity-Design.html
# data in csv files simualted using R in: https://gist.github.com/BioSciEconomist/68bf299688d8f12bd009d5a7943a90f6
import pandas as pd
import numpy as np
from matplotlib import style
from matplotlib import pyplot as plt
import seaborn as sns
import statsmodels.formula.api as smf
style.use("fivethirtyeight")
#----------------------------------
# example 1
#----------------------------------
df = pd.read_csv('/Users/mattbogard/Google Drive/Python Scripts/example1_RDD.csv')
df.head()
# visualize simualted data and probabilities by running variable and cutoff
df.plot.scatter(x="x", y="y")
# basic regression on full data
smf.ols('y ~ D + x', data=df).fit().summary() # OLS on full data where D
# is a function of our running variable x with cutoff = 0.
# if we didn't already have a threshold or treatment indicator we could have assigned it
# rdd_df = df.assign(threshold=(df["x"] > 0).astype(int)) # this is the same as D in our simulated data
# rdd_df.head()
# estimate regression manually on restricted bandwidth (poor persons RDD)
mask= (df['x'] > -.9) & (df['x'] < .9)
tmp = df[mask]
tmp.shape
model = smf.wls("y ~ D + x", tmp).fit()
model.summary()
# define kernel weighting (triangular)
def kernel(R, c, h):
indicator = (np.abs(R-c) <= h).astype(float)
return indicator * (1 - np.abs(R-c)/h)
plt.plot(df["x"], kernel(df["x"], c=0, h=.9))
plt.xlabel("x")
plt.ylabel("Weight")
plt.title("Kernel Weight by x h = .9");
# estimate model with various bandwidth and kernel weighting
model = smf.wls("y ~ D + x", df,
weights=kernel(df["x"], c=0, h=.9)).fit()
model.summary() # notice the difference made by kernel weighting - similar
# estimate but lower standard error (threshold is estimate)
# simialr to R
# summary(RDestimate(y ~ D + x, dat, subset = NULL, cutpoint = 0, bw = .9,
# kernel = "triangular", se.type = "HC1", cluster = NULL,
# verbose = FALSE, model = FALSE, frame = FALSE))
# we can use the python package to find the optimal bandwidth
from rdd import rdd
# identify optimal bandwidth on running variable x
bandwidth_opt = rdd.optimal_bandwidth(df['y'], df['x'], cut= 0)
print("Optimal bandwidth:", bandwidth_opt) # this matched bw from R
# restrict data to optimal bandwidth
data_rdd = rdd.truncated_data(df, 'x', bandwidth_opt, cut=0)
data_rdd.shape # check
plt.figure(figsize=(12, 8))
plt.scatter(data_rdd['x'], data_rdd['y'], facecolors='none', edgecolors='r')
plt.xlabel('x')
plt.ylabel('y')
plt.axvline(x= 0 , color='b')
plt.show()
plt.close()
# run models with optimal bw
model = smf.wls("y ~ D + x", data_rdd,
weights=kernel(data_rdd["x"], c=0, h=.906)).fit()
model.summary()
#----------------------------------
# example 2
#----------------------------------
# see also: https://mixtape.scunning.com/regression-discontinuity.html
df = pd.read_csv('/Users/mattbogard/Google Drive/Python Scripts/example2_RDD.csv')
df.head()
df.shape
df.describe()
# visualize simualted data and probabilities by running variable and cutoff
plt.scatter(df['x'], df['y2'], facecolors='none', edgecolors='r')
plt.xlabel('x')
plt.ylabel('y2')
plt.axvline(x= 50 , color='b')
# basic regression on full data
model = smf.wls("y2 ~ D + x", df).fit() # threshold is our treatmnt var
model.summary() # same result as above
df['predictions'] = model.fittedvalues
df.head()
plt.scatter(df['x'], df['predictions'], facecolors='none', edgecolors='g')
plt.scatter(df['x'], df['y2'], facecolors='none', edgecolors='r')
plt.axvline(x= 50 , color='b')
plt.xlabel('x')
plt.ylabel('predictions')
# estimate regression manually on restricted bandwidth (poor persons RDD) w/o optimal kernal
mask= (df['x'] > 45) & (df['x'] < 55)
tmp = df[mask]
tmp.shape
model = smf.wls("y2 ~ D + x", tmp).fit()
model.summary()
# we can use the python package to find the optimal bandwidth
from rdd import rdd
# identify optimal bandwidth on running variable x
bandwidth_opt = rdd.optimal_bandwidth(df['y2'], df['x'], cut= 50)
print("Optimal bandwidth:", bandwidth_opt) # this matched bw from R
# restrict data to optimal bandwidth
data_rdd = rdd.truncated_data(df, 'x', bandwidth_opt, cut=50)
data_rdd.shape # check
plt.figure(figsize=(12, 8))
plt.scatter(data_rdd['x'], data_rdd['y2'], facecolors='none', edgecolors='r')
plt.xlabel('x')
plt.ylabel('y')
plt.axvline(x= 50 , color='b')
plt.show()
plt.close()
# define kernel weighting (triangular)
def kernel(R, c, h):
indicator = (np.abs(R-c) <= h).astype(float)
return indicator * (1 - np.abs(R-c)/h)
plt.plot(data_rdd["x"], kernel(data_rdd["x"], c=50, h= bandwidth_opt))
plt.xlabel("x")
plt.ylabel("Weight")
plt.title("Kernel Weight by x h = optimal(8.9");
# run models with optimal bw
model = smf.wls("y2 ~ D + x", data_rdd,
weights=kernel(data_rdd["x"], c=50, h= bandwidth_opt)).fit()
model.summary()
data_rdd['predictions'] = model.fittedvalues
df.head()
plt.figure(figsize=(12, 8))
plt.scatter(data_rdd['x'], data_rdd['predictions'], facecolors='none', edgecolors='g')
plt.scatter(data_rdd['x'], data_rdd['y2'], facecolors='none', edgecolors='r')
plt.xlabel('x')
plt.ylabel('predictions')
plt.axvline(x= 50 , color='b')
# this matches R
# RDD estimate with optimal kernel
# summary(RDestimate(y2 ~ x, dat, subset = NULL, cutpoint = 50, bw = NULL,
# kernel = "triangular", se.type = "HC1", cluster = NULL,
# verbose = FALSE, model = FALSE, frame = FALSE))
#--------------------------------
# linear probability model
#--------------------------------
# read data simulated in R
df = pd.read_csv('/Users/mattbogard/Google Drive/Python Scripts/ex_RDD.csv')
df.head()
# visualize simualted data and hypothetical probabilities by running variable and cutoff
# df.plot.scatter(x="x", y="pr")
# visualize simualted data and probabilities by running variable and cutoff
plt.scatter(df['x'], df['pr'], facecolors='none', edgecolors='r')
plt.xlabel('x')
plt.ylabel('prob')
plt.axvline(x= 0 , color='b')
# basic LPM regression on full data
model = smf.wls("y ~ D + x", df).fit()
model.summary() # same result as above
df['predictions'] = model.fittedvalues
df.head()
df.plot.scatter(x="x", y="predictions", color="C0")
# estimate regression manually on restricted bandwidth (poor persons RDD)
mask= (df['x'] > -.5) & (df['x'] < .5)
tmp = df[mask]
tmp.shape
model = smf.wls("y ~ D + x", tmp).fit()
model.summary()
# define kernel weighting (triangular)
def kernel(R, c, h):
indicator = (np.abs(R-c) <= h).astype(float)
return indicator * (1 - np.abs(R-c)/h)
plt.plot(df["x"], kernel(df["x"], c=0, h=.5))
plt.xlabel("x")
plt.ylabel("Weight")
plt.title("Kernel Weight by x h = .5");
# run models with manually selected cutoff
model = smf.wls("y ~ D + x", df,
weights=kernel(df["x"], c=0, h=.5)).fit()
model.summary() # notice the difference made by kernel weighting - similar
# estimate but lower standard error
# same results as R LATE
# est_RDD <- RDestimate(y ~ x + D + D*x, dat, subset = NULL, cutpoint = 0, bw = .5,
# kernel = "triangular", se.type = "HC1", cluster = NULL,
# verbose = FALSE, model = FALSE, frame = FALSE)
# summary(est_RDD)
model = smf.wls("y ~ D + x", df,
weights=kernel(df["x"], c=0, h=.25)).fit()
model.summary() # same results as R half bw
model = smf.wls("y ~ D + x", df,
weights=kernel(df["x"], c=0, h=1)).fit()
model.summary() # same results as R double bw
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment