Last active
April 4, 2020 04:26
-
-
Save tuanpt98/2041fbc2fa034f83104755aa73cebc6d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from pyspark import SparkContext | |
| from pyspark.sql import SQLContext | |
| from pyspark.sql.functions import * | |
| from pyspark import SparkFiles, SparkConf | |
| from pyspark.ml import Pipeline | |
| from pyspark.ml.classification import RandomForestClassifier | |
| from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler | |
| from pyspark.mllib.evaluation import MulticlassMetrics | |
| from datetime import datetime | |
| import pandas as pd | |
| import numpy as np | |
| conf = SparkConf().setAll([ | |
| ("spark.network.timeout", "480s"), | |
| ("spark.executor.heartbeatInterval", "120s"), | |
| ]) | |
| sc = SparkContext('local[2]', '', conf=conf) | |
| sc.setLogLevel("ERROR") | |
| sqlContext = SQLContext(sc) | |
| # Read data frame from trainning_csv | |
| df_train = sqlContext.read.csv( | |
| SparkFiles.get( | |
| "C:\\spark\\spark-3.0.0-preview2-bin-hadoop2.7\\\kddcup_data\\kddcup_data_corrected.csv"), | |
| # "C:\\spark\\spark-3.0.0-preview2-bin-hadoop2.7\\kddcup99_csv.csv"), | |
| header=True, | |
| inferSchema=True | |
| ) | |
| df_train = df_train.replace('.','\n') | |
| df_train = df_train.withColumn("label", when(col("label") == "normal.", 0.0).otherwise(1.0)) | |
| categoricalCols = [col for col, dtype in df_train.dtypes if dtype == 'string' and col != 'label'] | |
| continiousCols = [col for col, dtype in df_train.dtypes if dtype != 'string'] | |
| indexer = StringIndexer(inputCols=categoricalCols, outputCols=[ | |
| "{0}Index".format(col) for col in categoricalCols]) | |
| assemblerInputs = indexer.getOutputCols() + continiousCols | |
| assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") | |
| stages = [indexer, assembler] | |
| pipeline = Pipeline(stages=stages) | |
| df_train = pipeline.fit(df_train).transform(df_train) | |
| # Read data frame from testing_csv | |
| df_test = sqlContext.read.csv( | |
| SparkFiles.get( | |
| "C:\\spark\\spark-3.0.0-preview2-bin-hadoop2.7\\\kddcup_data\\corrected.csv"), | |
| header=True, | |
| inferSchema=True | |
| ) | |
| df_test = df_test.replace('.','\n') | |
| df_test = df_test.withColumn("label", when(col("label") == "normal.", 0.0).otherwise(1.0)) | |
| categoricalCols = [col for col, dtype in df_test.dtypes if dtype == 'string' and col != 'label'] | |
| continiousCols = [col for col, dtype in df_test.dtypes if dtype != 'string'] | |
| indexer = StringIndexer(inputCols=categoricalCols, outputCols=[ | |
| "{0}Index".format(col) for col in categoricalCols]) | |
| assemblerInputs = indexer.getOutputCols() + continiousCols | |
| assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") | |
| stages = [indexer, assembler] | |
| pipeline = Pipeline(stages=stages) | |
| df_test = pipeline.fit(df_test).transform(df_test) | |
| #count_total = df.count() | |
| count_normal = df_train.filter(col("label") == 0).count() | |
| count_abnormal = df_train.filter(col("label") == 1).count() | |
| print("{:*^100}".format("")) | |
| print("data_frame trainning".format("")) | |
| #print("Total: \t\t{}".format(count_total)) | |
| print("Normal: \t{}".format(count_normal)) | |
| print("Anomaly: \t{}".format(count_abnormal)) | |
| #df_train, df_test = df.randomSplit([0.8, 0.2], seed=1) | |
| df_train_count = df_train.count() | |
| df_test_count = df_test.count() | |
| print("Train: \t{}".format(df_train_count)) | |
| print("Test: \t{}".format(df_test_count)) | |
| print("{:*^100}".format("")) | |
| # Tranning.... | |
| rdc = RandomForestClassifier(labelCol="label", predictionCol="prediction", probabilityCol="probability",maxBins=100) | |
| print("Training...") | |
| start_train = datetime.now() | |
| model = rdc.fit(df_train) | |
| time_train = datetime.now() - start_train | |
| print("Finished training in {:.2f} seconds".format(time_train.total_seconds())) | |
| predictions = model.transform(df_test) | |
| prediction_and_label = predictions.select("prediction", col("label")).rdd | |
| metrics = MulticlassMetrics(prediction_and_label) | |
| print("Confusion Matrix:") | |
| for line in metrics.confusionMatrix().toArray(): | |
| print(line) | |
| tpr = metrics.truePositiveRate(1.0) | |
| fpr = metrics.falsePositiveRate(1.0) | |
| tnr = metrics.truePositiveRate(0.0) | |
| fnr = metrics.falsePositiveRate(0.0) | |
| accuracy = metrics.accuracy | |
| precision = metrics.precision(1.0) | |
| recall = metrics.recall(1.0) | |
| print("TPR: {:.3%} \tFPR: {:.3%}".format(tpr, fpr)) | |
| print("TNR: {:.3%} \tFNR: {:.3%}".format(tnr, fnr)) | |
| print("Precision: {:.3%} \tRecall: {:.3%} \tAccuracy: {:.3%}".format(precision, recall, accuracy)) | |
| print("{:*^100}".format("")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment