Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save mkaranasou/6f039fb51d0f33707f95dea1ddbc19a8 to your computer and use it in GitHub Desktop.

Select an option

Save mkaranasou/6f039fb51d0f33707f95dea1ddbc19a8 to your computer and use it in GitHub Desktop.
Gather the features as dense vectors with a udf to train an Isolation Forest
from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark_iforest.ml.iforest import IForest, IForestModel
from pyspark.ml.linalg import Vectors, VectorUDT
conf = SparkConf()
conf.set('spark.jars', '/full/path/to/spark-iforest-2.4.0.jar')
spark = SparkSession \
.builder \
.config(conf=conf) \
.appName("IForestExample") \
.getOrCreate()
data = [
{'feature1': 1., 'feature2': 0., 'feature3': 0.3, 'feature4': 0.01},
{'feature1': 10., 'feature2': 3., 'feature3': 0.9, 'feature4': 0.1},
{'feature1': 101., 'feature2': 13., 'feature3': 0.9, 'feature4': 0.91},
{'feature1': 111., 'feature2': 11., 'feature3': 1.2, 'feature4': 1.91},
{'feature1': 0., 'feature2': 0., 'feature3': 0., 'feature4': 0.1},
]
to_dense_vector_udf = F.udf(lambda l: Vectors.dense(l), VectorUDT())
df = spark.createDataFrame(data)
df.printSchema()
df = df.withColumn('features', F.array(*df.columns))
df = df.withColumn('vectorized_features', to_dense_vector_udf('features'))
df.show()
df.printSchema()
# root
# |-- feature1: double (nullable = true)
# |-- feature2: double (nullable = true)
# |-- feature3: double (nullable = true)
# |-- feature4: double (nullable = true)
#
# +--------+--------+--------+--------+--------------------+--------------------+
# |feature1|feature2|feature3|feature4| features| vectorized_features|
# +--------+--------+--------+--------+--------------------+--------------------+
# | 1.0| 0.0| 0.3| 0.01|[1.0, 0.0, 0.3, 0...| [1.0,0.0,0.3,0.01]|
# | 10.0| 3.0| 0.9| 0.1|[10.0, 3.0, 0.9, ...| [10.0,3.0,0.9,0.1]|
# | 101.0| 13.0| 0.9| 0.91|[101.0, 13.0, 0.9...|[101.0,13.0,0.9,0...|
# | 111.0| 11.0| 1.2| 1.91|[111.0, 11.0, 1.2...|[111.0,11.0,1.2,1...|
# | 0.0| 0.0| 0.0| 0.1|[0.0, 0.0, 0.0, 0.1]| [0.0,0.0,0.0,0.1]|
# +--------+--------+--------+--------+--------------------+--------------------+
#
# root
# |-- feature1: double (nullable = true)
# |-- feature2: double (nullable = true)
# |-- feature3: double (nullable = true)
# |-- feature4: double (nullable = true)
# |-- features: array (nullable = false)
# | |-- element: double (containsNull = true)
# |-- vectorized_features: vector (nullable = true)
scaler = StandardScaler(inputCol='vectorized_features', outputCol='scaled_features')
iforest = IForest(contamination=0.3, maxDepth=2)
iforest.setFeaturesCol('scaled_features')
iforest.setSeed(42) # for reproducibility
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)
model = iforest.fit(df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment