Skip to content

Instantly share code, notes, and snippets.

@tuanpt98
Last active April 4, 2020 04:26
Show Gist options
  • Select an option

  • Save tuanpt98/2041fbc2fa034f83104755aa73cebc6d to your computer and use it in GitHub Desktop.

Select an option

Save tuanpt98/2041fbc2fa034f83104755aa73cebc6d to your computer and use it in GitHub Desktop.
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark import SparkFiles, SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.mllib.evaluation import MulticlassMetrics
from datetime import datetime
import pandas as pd
import numpy as np
conf = SparkConf().setAll([
("spark.network.timeout", "480s"),
("spark.executor.heartbeatInterval", "120s"),
])
sc = SparkContext('local[2]', '', conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
# Read data frame from trainning_csv
df_train = sqlContext.read.csv(
SparkFiles.get(
"C:\\spark\\spark-3.0.0-preview2-bin-hadoop2.7\\\kddcup_data\\kddcup_data_corrected.csv"),
# "C:\\spark\\spark-3.0.0-preview2-bin-hadoop2.7\\kddcup99_csv.csv"),
header=True,
inferSchema=True
)
df_train = df_train.replace('.','\n')
df_train = df_train.withColumn("label", when(col("label") == "normal.", 0.0).otherwise(1.0))
categoricalCols = [col for col, dtype in df_train.dtypes if dtype == 'string' and col != 'label']
continiousCols = [col for col, dtype in df_train.dtypes if dtype != 'string']
indexer = StringIndexer(inputCols=categoricalCols, outputCols=[
"{0}Index".format(col) for col in categoricalCols])
assemblerInputs = indexer.getOutputCols() + continiousCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages = [indexer, assembler]
pipeline = Pipeline(stages=stages)
df_train = pipeline.fit(df_train).transform(df_train)
# Read data frame from testing_csv
df_test = sqlContext.read.csv(
SparkFiles.get(
"C:\\spark\\spark-3.0.0-preview2-bin-hadoop2.7\\\kddcup_data\\corrected.csv"),
header=True,
inferSchema=True
)
df_test = df_test.replace('.','\n')
df_test = df_test.withColumn("label", when(col("label") == "normal.", 0.0).otherwise(1.0))
categoricalCols = [col for col, dtype in df_test.dtypes if dtype == 'string' and col != 'label']
continiousCols = [col for col, dtype in df_test.dtypes if dtype != 'string']
indexer = StringIndexer(inputCols=categoricalCols, outputCols=[
"{0}Index".format(col) for col in categoricalCols])
assemblerInputs = indexer.getOutputCols() + continiousCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages = [indexer, assembler]
pipeline = Pipeline(stages=stages)
df_test = pipeline.fit(df_test).transform(df_test)
#count_total = df.count()
count_normal = df_train.filter(col("label") == 0).count()
count_abnormal = df_train.filter(col("label") == 1).count()
print("{:*^100}".format(""))
print("data_frame trainning".format(""))
#print("Total: \t\t{}".format(count_total))
print("Normal: \t{}".format(count_normal))
print("Anomaly: \t{}".format(count_abnormal))
#df_train, df_test = df.randomSplit([0.8, 0.2], seed=1)
df_train_count = df_train.count()
df_test_count = df_test.count()
print("Train: \t{}".format(df_train_count))
print("Test: \t{}".format(df_test_count))
print("{:*^100}".format(""))
# Tranning....
rdc = RandomForestClassifier(labelCol="label", predictionCol="prediction", probabilityCol="probability",maxBins=100)
print("Training...")
start_train = datetime.now()
model = rdc.fit(df_train)
time_train = datetime.now() - start_train
print("Finished training in {:.2f} seconds".format(time_train.total_seconds()))
predictions = model.transform(df_test)
prediction_and_label = predictions.select("prediction", col("label")).rdd
metrics = MulticlassMetrics(prediction_and_label)
print("Confusion Matrix:")
for line in metrics.confusionMatrix().toArray():
print(line)
tpr = metrics.truePositiveRate(1.0)
fpr = metrics.falsePositiveRate(1.0)
tnr = metrics.truePositiveRate(0.0)
fnr = metrics.falsePositiveRate(0.0)
accuracy = metrics.accuracy
precision = metrics.precision(1.0)
recall = metrics.recall(1.0)
print("TPR: {:.3%} \tFPR: {:.3%}".format(tpr, fpr))
print("TNR: {:.3%} \tFNR: {:.3%}".format(tnr, fnr))
print("Precision: {:.3%} \tRecall: {:.3%} \tAccuracy: {:.3%}".format(precision, recall, accuracy))
print("{:*^100}".format(""))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment