Skip to content

Instantly share code, notes, and snippets.

@nilsabdi
Created August 9, 2023 22:30
Show Gist options
  • Select an option

  • Save nilsabdi/ddadd06e01d68305472f8c29d23e55cd to your computer and use it in GitHub Desktop.

Select an option

Save nilsabdi/ddadd06e01d68305472f8c29d23e55cd to your computer and use it in GitHub Desktop.
import pickle
import time
import pandas as pd
import geopandas as gpd
from libpysal.weights import Kernel
from shapely.geometry import Point
from splot import esda as esdaplot
from pysal.explore import esda
from esda.moran import Moran
import os
import numpy as np
import pysal.lib
from spreg import LMtests
from pysal.model import spreg
from scipy.sparse import csr_matrix
from libpysal.weights import WSP, W
def Moran_stat(kernel):
# Load data
ldn_list = pd.read_csv("./Input/LdnListings_Proc_Dist_Sent_Socio_kNN.csv")
ln_prices = ldn_list["ln_price"].tolist()
# Compute Moran's I statistic
moran = Moran(ln_prices, kernel)
p_value = moran.p_sim
# Save results
results_file = "./raw_test/Moran_results.csv"
# Create a DataFrame for the new results
new_data = pd.DataFrame({"Kernel": [kernel],
"Moran": [moran.I],
"p_value": [p_value]})
if os.path.exists(results_file):
existing_data = pd.read_csv(results_file)
if kernel in existing_data["Kernel"].values:
existing_data.update(new_data)
else:
existing_data = existing_data.append(new_data)
existing_data.to_csv(results_file, index=False)
else:
new_data.to_csv(results_file, index=False)
return moran.I, p_value
# #### 2.2. LM test
# In[16]:
def spatial_lagrange_test(kernel):
# Load data and variables
ldn_list = pd.read_csv("./Input/LdnListings_Proc_Dist_Sent_Socio_kNN.csv")
x_names = ["accommodates", "bathrooms", "bedrooms",
"number_of_reviews", "score_overall",
"City_Dist_km", "Stat_Dist_km",
"Sentiment_Avg", "Sentiment_Var",
"Weekly earnings per Borough", "Crime Rate per Borough"]
y = np.array(ldn_list["ln_price"]).reshape((-1,1))
x = np.array(ldn_list[x_names])
# Base OLS model
ols = spreg.OLS(y, x)
lms = LMtests(ols, kernel)
results = {
"Kernel": [kernel, kernel],
"Test": ["Lagrange Multiplier (Lag)", "Lagrange Multiplier (Error)"],
"Statistic": [lms.lml[0], lms.lme[0]],
"p-value": [lms.lml[1], lms.lme[1]],
# Print lms.lme, lml rlme, rlml:
"lml": [lms.lml[0], lms.lml[1]],
"lme": [lms.lme[0], lms.lme[1]],
"rlml": [lms.rlml[0], lms.rlml[1]],
"rlme": [lms.rlme[0], lms.rlme[1]]
}
df_results = pd.DataFrame(results)
# Save results
results_file = "./raw_test/LM_results.csv"
if os.path.exists(results_file):
df_existing = pd.read_csv(results_file)
df_combined = pd.concat([df_existing, df_results], ignore_index=True)
df_combined.to_csv(results_file, index=False)
else:
df_results.to_csv(results_file, index=False)
return df_results
# ### 3. Data processing: Weight matrices
# In[13]:
#Processes and Saves new kernel
def weight_proc(matrix, name, k):
if not all(matrix[i][j] == matrix[j][i] for i, neighbors in matrix.neighbors.items() for j in neighbors):
print("Matrix is not symmetric!")
full_matrix, ids = matrix.full()
np.fill_diagonal(full_matrix, 0)
# Convert to a sparse matrix
sparse_matrix = csr_matrix(full_matrix)
# Convert to WSP (sparse weights) and then to W
wsp = WSP(sparse_matrix)
w = wsp.to_W()
# Row standardize
w.transform = 'r'
#Sanity checks
count_mismatch = 0
for i, neighbors in w.neighbors.items():
row_sum = sum(w.weights[i])
if abs(row_sum - 1) > 1e-10:
count_mismatch += 1
## Rows sum to one?
if count_mismatch:
print(f"{count_mismatch} rows do not sum to 1.")
else:
print("All rows sum to 1.")
## Negative weights?
if any(w_ij < 0 for neighbors in w.weights.values() for w_ij in neighbors):
print("There are negative weights!")
## Isolated rows?
isolated_rows = [i for i, neighbors in w.neighbors.items() if not neighbors]
if isolated_rows:
print(f"There are {len(isolated_rows)} isolated rows.")
## Diagonal elements zero?
full_w_matrix, _ = w.full()
if not all(np.diagonal(full_w_matrix) == 0):
print("Not all diagonal elements are zero!")
#Save as NEW kernel matrix
with open(name + "_" + k + "_Proc.pkl",'wb') as f:
pickle.dump(w, f)
return w.full()
# In[14]:
ldn_list = pd.read_csv("./Input/LdnListings_Proc_Dist_Sent_Socio_kNN.csv")
with open("./Input/gaussian_2.pkl", "rb") as f:
w = pickle.load(f)
weight_proc(w, "Gaussian", "2")
# Moran_stat(w)
# spatial_lagrange_test(w)
# ### 4. Run models
# #### 4.1. Quadratic
# #### 4.2. Gaussian
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment