Last active
August 16, 2021 06:35
-
-
Save jamm1985/39783d2e6f20340116d6e14a73749fe6 to your computer and use it in GitHub Desktop.
Perform logistic regression for landslide prediction based on rainfall measurements
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
File: landslide_rainfall_logit.py | |
Author: Andrey Stepnov | |
Email: [email protected], [email protected] | |
Github: https://github.com/jamm1985 | |
Description: Perform logistic regression for landslide prediction | |
based on rainfall measurements. | |
Classification tests, plots and metrics! | |
""" | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from statsmodels.formula.api import logit | |
from statsmodels.graphics.regressionplots import plot_partregress_grid | |
from stargazer.stargazer import Stargazer | |
from sklearn.metrics import classification_report | |
from sklearn.metrics import precision_recall_curve | |
from sklearn.metrics import balanced_accuracy_score | |
# read csv from wr43988.zip with day Temp and Rainfall data | |
# Vladivostok station | |
# source http://meteo.ru/it/178-aisori | |
rainfall_raw = pd.read_csv( | |
"../DATA/wr43988/wr43988.txt", | |
sep=";", | |
header=None, | |
parse_dates=[[1, 2, 3]], | |
skipinitialspace=True, | |
na_values=" ") | |
# set column names from data description fld43988a0.txt | |
rainfall_raw.columns =\ | |
['date', | |
'index_vmo', | |
'temperature_quality_feature', | |
'min_day_temp', | |
'avg_day_temp', | |
'max_day_temp', | |
'total_rainfall'] | |
print("count NA for Temp and Rainfall data \n{}".format( | |
rainfall_raw.isna().sum())) | |
# drop index_vmp and temperature_quality_feature column | |
rainfall_raw = rainfall_raw.drop(columns=['index_vmo', | |
'temperature_quality_feature']) | |
# read csv from wr43990.zip with day Soil temperature at depth | |
# Vladivostok station | |
# source http://meteo.ru/it/178-aisori | |
soil_temp_depth_raw = pd.read_csv( | |
"../DATA/wr43990/wr43990.txt", | |
sep=";", | |
header=None, | |
parse_dates=[[1, 2, 3]], | |
skipinitialspace=True, | |
na_values="999.9") | |
# set column names from data description fld43990a0.txt | |
soil_temp_depth_raw.columns =\ | |
['date', | |
'index_vmo', | |
'temp_2_cm', | |
'temp_5_cm', | |
'temp_10_cm', | |
'temp_15_cm', | |
'temp_20_cm', | |
'temp_40_cm', | |
'temp_60_cm', | |
'temp_80_cm', | |
'temp_120_cm', | |
'temp_160_cm', | |
'temp_240_cm', | |
'temp_320_cm'] | |
print("count NA for Soil temperature at depth data \n{}".format( | |
soil_temp_depth_raw.isna().sum())) | |
# convert date column to datetime64 and mark NaT out-of-bounds dates | |
soil_temp_depth_raw['date'] =\ | |
pd.to_datetime(soil_temp_depth_raw['date'], errors='coerce') | |
# drop out-of-bounds dates rows | |
soil_temp_depth_raw =\ | |
soil_temp_depth_raw[soil_temp_depth_raw['date'].notnull()] | |
# drop index_vmo column and columns with NA more than 50% | |
soil_temp_depth_raw =\ | |
soil_temp_depth_raw.drop( | |
columns=['index_vmo', | |
'temp_2_cm', | |
'temp_5_cm', | |
'temp_10_cm', | |
'temp_15_cm', | |
'temp_60_cm', | |
'temp_320_cm']) | |
# join two dataframes by date key | |
DATA = rainfall_raw.join( | |
soil_temp_depth_raw.set_index('date'), on='date') | |
print("count NA for accumulated DATA \n{}".format( | |
DATA.isna().sum())) | |
# drop rows before 1946-01-01 | |
DATA = DATA[DATA['date'] >= '1946-01-01'] | |
DATA = DATA.reset_index(drop=True) | |
# landslide cases | |
landslides = pd.read_excel("../DATA/landslides_cases.xlsx") | |
# join data | |
DATA = DATA.join(landslides.set_index('DATE'), on='date') | |
# assume that all landslide did not occur at any other day | |
DATA['LANDSLIDE'] = DATA['LANDSLIDE'].fillna(0) | |
# add cumulative rainfall during YEAR | |
# I trust there is more elegant and efficient way to do this! | |
DATA['cumm_year_rainfall'] = 0.0 | |
for i in range(0, len(DATA)): | |
DATA.cumm_year_rainfall[i] =\ | |
DATA[(DATA['date'] > DATA.iloc[i].date - pd.offsets.YearBegin(1)) | |
& (DATA['date'] < DATA.iloc[i].date)]['total_rainfall'].sum() | |
# add antecedent rainfall values for each row | |
# IETD = 1 day, see Hong, M., Kim, J., Jeong, S., 2018. Landslides 15, 523–534. | |
# and again there is more elegant way to do this! | |
DATA['antecedent_rainfall'] = 0.0 | |
for i in range(1, len(DATA)): | |
j = 1 | |
antecedent_rainfall_value = 0 | |
while DATA.iloc[i-j].total_rainfall != 0: | |
if pd.isna(DATA.total_rainfall.iloc[i-j]): | |
pass | |
else: | |
antecedent_rainfall_value +=\ | |
DATA.iloc[i-j].total_rainfall | |
j += 1 | |
# print(i, j) | |
DATA.antecedent_rainfall[i] = antecedent_rainfall_value | |
# save DATA to xlsx | |
DATA.to_excel("../DATA/all_variables.xlsx") | |
# load data if you lost you ipython session | |
# DATA = pd.read_excel("../DATA/all_variables.xlsx") | |
# summary table about DATA | |
summary_columns = ['count', 'mean', 'std', 'min', 'max'] | |
format_table = {'count': '{:.0f}', | |
'mean': '{:.3f}', | |
'std': '{:.3f}', | |
'min': '{:.1f}', | |
'max': '{:.1f}', | |
} | |
with open('../RESULTS/summary_all_data.html', 'w') as f: | |
print( | |
DATA.describe().T[summary_columns].style.format(format_table).render(), | |
file=f) | |
# see landslides cases over day temp and rainfalls | |
CASES = DATA[DATA['LANDSLIDE'] == 1][[ | |
'date', | |
'avg_day_temp', | |
'total_rainfall', | |
'cumm_year_rainfall', | |
'antecedent_rainfall']] | |
print("All landslide cases\n{}".format(CASES)) | |
print("And some stats about it\n{}".format(CASES.describe())) | |
# For logit: fight a QUASI-COMPLETE SEPARATION | |
rainfall_threshold =\ | |
CASES['total_rainfall'].min() - DATA['total_rainfall'].std() | |
print("Rainfall threshold is {}".format(rainfall_threshold)) | |
# logit! | |
REGRESSION_SAMPLE = DATA[DATA['total_rainfall'] >= rainfall_threshold] | |
REGRESSION_SAMPLE = REGRESSION_SAMPLE.reset_index(drop=True) | |
REGRESSION_SAMPLE = REGRESSION_SAMPLE.astype({'LANDSLIDE': 'int32'}) | |
# summary table about train set | |
summary_columns = ['count', 'mean', 'std', 'min', 'max'] | |
format_table = {'count': '{:.0f}', | |
'mean': '{:.3f}', | |
'std': '{:.3f}', | |
'min': '{:.1f}', | |
'max': '{:.1f}', | |
} | |
with open('../RESULTS/summary_train_set.html', 'w') as f: | |
print( | |
REGRESSION_SAMPLE.describe().T[summary_columns].style.format( | |
format_table).render(), | |
file=f) | |
# R-style logit! | |
logit_mod = logit( | |
"LANDSLIDE ~" | |
" total_rainfall" | |
" + antecedent_rainfall" | |
" + cumm_year_rainfall", | |
REGRESSION_SAMPLE) | |
logit_res = logit_mod.fit() | |
print(logit_res.summary()) | |
print("\n") | |
print(logit_res.get_margeff().summary()) | |
# write results to a table | |
stargazer = Stargazer([logit_res]) | |
with open('../RESULTS/logit.html', 'w') as f: | |
print(stargazer.render_html(), file=f) | |
# optimal threshold | |
# see | |
# https://machinelearningmastery.com/threshold-moving-for-imbalanced-classification/ | |
y_true = REGRESSION_SAMPLE['LANDSLIDE'].to_numpy() | |
# calculate roc curves | |
precision, recall, thresholds =\ | |
precision_recall_curve(y_true, logit_res.predict()) | |
# convert to f score | |
fscore = (2 * precision * recall) / (precision + recall) | |
# locate the index of the largest f score | |
ix = np.argmax(fscore) | |
print('Best Threshold=%f, F-Score=%.3f' % (thresholds[ix], fscore[ix])) | |
# Model performance metrics | |
y_true = REGRESSION_SAMPLE['LANDSLIDE'].to_numpy() | |
y_pred = np.where(logit_res.predict() >= thresholds[ix], 1, 0) | |
# precision, recall, f1, overall accuracy | |
print(classification_report(y_true, | |
y_pred, | |
target_names=['Not a Landslide', | |
'Landslide'])) | |
# confusion matrix for the best threshold | |
print("confusion matrix\n{}".format( | |
logit_res.pred_table(threshold=thresholds[ix]))) | |
# balanced accuracy | |
print("ballanced accuracy {}".format( | |
balanced_accuracy_score(y_true, y_pred))) | |
# PLOTTING SECTION! | |
# plot best threshold for logit model (Precision-Recall Curve) | |
plt.clf() | |
plt.plot(recall, precision, marker='.', | |
color='gray', label='Logit(P)') | |
plt.scatter(recall[ix], precision[ix], s=80, | |
marker='o', color='black', label='Best F1 score') | |
# axis labels | |
plt.xlabel('Recall') | |
plt.ylabel('Precision') | |
plt.legend() | |
# show the plot | |
plt.savefig("../FIGS/best_threshold.png", dpi=600, format='png') | |
plt.close() | |
# plot best threshold for logit model (F1 score and Balanced Accuracy Curve) | |
# calculate balanced accuracy | |
balanced_accuracy = np.array([]) | |
for i in np.insert(thresholds, 0, 0.0): | |
y_pred_iteration = np.where(logit_res.predict() >= i, 1, 0) | |
balanced_accuracy =\ | |
np.append(balanced_accuracy, | |
balanced_accuracy_score(y_true, y_pred_iteration)) | |
plt.clf() | |
plt.figure(figsize=(8, 6)) | |
plt.plot(np.insert(thresholds, 0, 0.0), fscore, marker='.', | |
color='gray', label='F1score') | |
# plt.plot(thresholds, balanced_accuracy, marker='x', | |
# color='gray', label='Balanced Accuracy') | |
plt.scatter(np.insert(thresholds, 0, 0.0)[ix], fscore[ix], s=80, | |
marker='o', color='black', label='Best F1 score') | |
# axis labels | |
plt.xlabel('Threshold') | |
plt.ylabel('F1 score') | |
plt.legend() | |
# show the plot | |
plt.savefig("../FIGS/F1score_threshold.png", dpi=600, format='png') | |
plt.close() | |
# plot threshold and precision/recall | |
plt.clf() | |
plt.figure(figsize=(8, 6)) | |
plt.plot(np.insert(thresholds, 0, 0.0), recall, marker='.', | |
color='gray', label='Recall') | |
plt.plot(np.insert(thresholds, 0, 0.0), precision, marker='x', | |
color='gray', label='Precision') | |
plt.scatter(np.insert(thresholds, 0, 0.0)[ix+1], fscore[ix], s=80, | |
marker='o', color='black', label='Best F1 score') | |
# axis labels | |
plt.xlabel('Threshold') | |
# plt.ylabel('') | |
plt.legend() | |
# show the plot | |
plt.savefig("../FIGS/precision_recall_threshold.png", dpi=600, format='png') | |
plt.close() | |
# correlation matrix all the DATA | |
correlation_matrix = DATA.corr() | |
# clear plot object | |
plt.clf() | |
plt.figure(figsize=(18, 14)) | |
# rename axis to human-like style | |
# plot and save fig | |
ax_1 = sns.heatmap( | |
correlation_matrix, | |
annot=True, | |
square=True, | |
linewidths=.5) | |
ax_1.set_title('Correlation matrix') | |
plt.savefig("../FIGS/correlation_matrix.png", dpi=600, format='png') | |
# plt.show() | |
plt.close() | |
# print only rainfall correlation matrix | |
# clear plot object | |
plt.clf() | |
correlation_matrix_rainfall =\ | |
DATA[['total_rainfall', | |
'cumm_year_rainfall', | |
'antecedent_rainfall', | |
'LANDSLIDE']].corr() | |
lables = ["Daily\nrainfall", | |
"Cumulative\nprecipitation", | |
"Antecedent\nrainfall", | |
"Landslide"] | |
correlation_matrix_rainfall.columns = lables | |
correlation_matrix_rainfall.index = lables | |
plt.figure(figsize=(10, 8)) | |
# fix bug with text alignment on y-axes | |
# https://github.com/mwaskom/seaborn/issues/1820 | |
plt.setp(plt.gca().yaxis.get_majorticklabels(), va="center") | |
ax_1 = sns.heatmap( | |
correlation_matrix_rainfall, | |
annot=True, | |
square=True, | |
linewidths=.5) | |
# ax_1.set_title('Correlation matrix') | |
plt.savefig("../FIGS/correlation_matrix_rainfall.png", dpi=600, format='png') | |
# plt.show() | |
plt.close() | |
# plot Landslide dummy variable on Day and Year cumulative rainfall axes | |
# Fig 5 | |
# clear plot object | |
plt.clf() | |
lm = sns.lmplot( | |
x='total_rainfall', | |
y='cumm_year_rainfall', | |
hue='LANDSLIDE', | |
# logistic=True, | |
fit_reg=False, | |
markers=[".", "+"], | |
legend_out=True, | |
palette=["lightgray", "black"], | |
data=DATA) | |
lm = lm.set_axis_labels("Daily rainfall, mm", "Cumulative precipitation, mm") | |
lm._legend.set_title("Landslide") | |
new_labels = ['No', 'Yes'] | |
for t, l in zip(lm._legend.texts, new_labels): t.set_text(l) | |
# lm.fig.suptitle('Landslide occurrence depends on rainfall (mm)') | |
# plot decision boundary P=0.5 | |
X_05 = np.array([84, 128]) | |
# set one variable to median and solve for P=0.5 | |
Y_05 = (logit_res.params['Intercept'] + | |
logit_res.params['total_rainfall'] * X_05 + | |
logit_res.params['antecedent_rainfall'] * | |
REGRESSION_SAMPLE['antecedent_rainfall'].mean()) /\ | |
-logit_res.params['cumm_year_rainfall'] | |
linewidth = 0.6 | |
fontsize = 8 | |
plt.plot(X_05, Y_05, '--', color='black', linewidth=linewidth, label='P=0.5') | |
plt.text(X_05[0], Y_05[0], '0.50', | |
ha='right', va='center', | |
backgroundcolor='white', | |
fontsize=fontsize) | |
# Plot decision boundary for P=BEST | |
X_best = np.array([64, 108]) | |
Y_best = np.log(thresholds[ix]/(1-thresholds[ix])) /\ | |
logit_res.params['cumm_year_rainfall'] +\ | |
(logit_res.params['Intercept'] + | |
logit_res.params['total_rainfall'] * X_best + | |
logit_res.params['antecedent_rainfall'] * | |
REGRESSION_SAMPLE['antecedent_rainfall'].mean()) /\ | |
-logit_res.params['cumm_year_rainfall'] | |
plt.plot(X_best, Y_best, '-', color='black', linewidth=linewidth) | |
plt.text(X_best[0], Y_best[0], '{:.2f}'.format(thresholds[ix]), | |
ha='right', va='center', | |
backgroundcolor='white', | |
fontsize=fontsize) | |
plt.savefig('../FIGS/scatter_categorical_plot_1.png', dpi=600, format='png') | |
plt.close() | |
# plot Landslide dummy variable on Day and antecedent cumulative rainfall axes | |
# Fig 6 | |
# clear plot object | |
plt.clf() | |
lm = sns.lmplot( | |
x='total_rainfall', | |
y='antecedent_rainfall', | |
hue='LANDSLIDE', | |
# logistic=True, | |
fit_reg=False, | |
markers=[".", "+"], | |
legend_out=True, | |
palette=["lightgray", "black"], | |
data=DATA) | |
lm = lm.set_axis_labels("Daily rainfall, mm", "Antecedent rainfall, mm") | |
lm._legend.set_title("Landslide") | |
new_labels = ['No', 'Yes'] | |
for t, l in zip(lm._legend.texts, new_labels): t.set_text(l) | |
# lm.fig.suptitle('Landslide occurrence depends on rainfall (mm)') | |
# plot decision boundary for P=0.5 | |
# set one variable to median and solve for P=0.5 | |
X_05 = np.array([60, 120]) | |
Y_05 = (logit_res.params['Intercept'] + | |
logit_res.params['total_rainfall'] * X_05 + | |
logit_res.params['cumm_year_rainfall'] * | |
REGRESSION_SAMPLE['cumm_year_rainfall'].mean()) /\ | |
-logit_res.params['antecedent_rainfall'] | |
linewidth = 0.6 | |
fontsize = 8 | |
plt.plot(X_05, Y_05, '--', color='black', linewidth=linewidth, label='P=0.5') | |
plt.text(X_05[0], Y_05[0], '0.50', | |
ha='right', va='center', | |
backgroundcolor='white', | |
fontsize=fontsize) | |
# Plot decision boundary for P=BEST | |
X_best = np.array([40, 98]) | |
Y_best = np.log(thresholds[ix]/(1-thresholds[ix])) /\ | |
logit_res.params['antecedent_rainfall'] +\ | |
(logit_res.params['Intercept'] + | |
logit_res.params['total_rainfall'] * X_best + | |
logit_res.params['cumm_year_rainfall'] * | |
REGRESSION_SAMPLE['cumm_year_rainfall'].mean()) /\ | |
-logit_res.params['antecedent_rainfall'] | |
plt.plot(X_best, Y_best, '-', color='black', linewidth=linewidth) | |
plt.text(X_best[0], Y_best[0]+5, '{:.2f}'.format(thresholds[ix]), | |
ha='right', va='center', | |
backgroundcolor='white', | |
fontsize=fontsize) | |
plt.savefig('../FIGS/scatter_categorical_plot_2.png', dpi=600, format='png') | |
plt.close() | |
# plot cumulative rainfall distributions by months | |
# Fig 3 | |
RAINFAL_BY_MONTH =\ | |
DATA[['date', 'total_rainfall']].set_index('date').resample('M').sum() | |
RAINFAL_BY_MONTH['Month'] =\ | |
RAINFAL_BY_MONTH.index.strftime('%b') | |
plt.clf() | |
plt.figure(figsize=(8, 6)) | |
ax = sns.boxplot(y="total_rainfall", | |
x="Month", | |
palette=["black"], | |
data=RAINFAL_BY_MONTH) | |
plt.setp(ax.artists, edgecolor='k', facecolor='w') | |
plt.setp(ax.lines, color='k') | |
plt.ylabel('Precipitation, mm') | |
plt.savefig('../FIGS/rainfall_by_month.png', dpi=600, format='png') | |
plt.close() | |
# plot cases by total rainfall and date time | |
CASES_fig = CASES[['date', 'total_rainfall']] | |
CASES_fig['month'] = CASES_fig['date'].dt.month_name() | |
plt.clf() | |
plt.figure(figsize=(8, 6)) | |
lm = sns.lmplot( | |
x='date', | |
y='total_rainfall', | |
hue='month', | |
hue_order=['July', 'August', 'September', 'October'], | |
# logistic=True, | |
fit_reg=False, | |
markers=["x", "+", ".", "2"], | |
legend_out=True, | |
# palette=["black"], | |
data=CASES_fig) | |
lm = lm.set_axis_labels("Year", "Rainfall intensity, mm") | |
lm._legend.set_title("Landslide") | |
plt.savefig('../FIGS/scatter_categorical_plot_4.png', dpi=600, format='png') | |
plt.close() | |
# Partial regression plot | |
CORRECT_NAMES = REGRESSION_SAMPLE[['total_rainfall', | |
'antecedent_rainfall', | |
'cumm_year_rainfall', | |
'LANDSLIDE']] | |
CORRECT_NAMES.columns = ['Daily_rainfall', | |
'Antecedent_rainfall', | |
'Cumulative_precipitation', | |
'Landslide'] | |
logit_mod = logit( | |
"Landslide ~" | |
" Daily_rainfall" | |
" + Antecedent_rainfall" | |
" + Cumulative_precipitation", | |
CORRECT_NAMES) | |
logit_res = logit_mod.fit() | |
print(logit_res.summary()) | |
print("\n") | |
print(logit_res.get_margeff().summary()) | |
plt.clf() | |
fig = plt.figure(figsize=(10, 8)) | |
plot_partregress_grid(logit_res, fig=fig) | |
plt.savefig('../FIGS/partial_regression_plot.png', dpi=600, format='png') | |
plt.close() |
Citation
Stepnova, Y.A., Stepnov, A.A., Konovalov, A.V. et al. Predictive Model of Rainfall-Induced Landslides in High-Density Urban Areas of the South Primorsky Region (Russia). Pure Appl. Geophys. (2021). https://doi.org/10.1007/s00024-021-02822-y
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Historical meteorological data:
Landslide cases: