conda create -n pyspark-demo
conda activate pyspark-demo
pip install streamlit pyspark >= 3.0.0
Last active
January 29, 2024 08:25
-
-
Save andfanilo/9bde77d9cc8cebd44154cfd6ddd1ea6f to your computer and use it in GitHub Desktop.
Pyspark Streamlit demo for university
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.rdd import RDD | |
from pyspark.sql import Row | |
import streamlit as st | |
from utils import _initialize_spark | |
st.write("# :tada: Hello Pyspark") | |
spark, sc = _initialize_spark() | |
st.write("[Link to Spark window](http://localhost:4040)") | |
st.write("## Create RDD from a Python list") | |
l = list(range(10)) | |
# st.write(l) | |
rdd = sc.parallelize(l) | |
rdd.cache() | |
st.write(rdd) | |
st.write("## Get results through actions") | |
st.write(rdd.collect()) | |
st.write(rdd.take(3)) | |
st.write(rdd.count()) | |
st.write("## Transform RDDs") | |
st.write(rdd.filter(lambda x: x%2==0).collect()) # talk about lazy evaluation here: filter still non evaluated rdd | |
st.write(rdd.map(lambda x: x*2).collect()) | |
st.write(rdd.map(lambda x: x*2).reduce(lambda x, y: x + y)) # reduce runs all previous rdds | |
# Compare the two following | |
st.write(rdd.map(lambda x: list(range(x))).collect()) | |
st.write(rdd.flatMap(lambda x: list(range(x))).collect()) | |
st.write("## Wordcount") | |
file_rdd = sc.textFile("lorem.txt") | |
st.write(file_rdd.collect()) # so what's inside ? | |
st.write(file_rdd.flatMap(lambda sentence: sentence.split()).map(lambda word: (word, 1)).reduceByKey(lambda x,y: x+y).collect()) | |
st.write() | |
# spark.stop() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import pyspark.pandas as ps | |
import streamlit as st | |
from pyspark.ml.feature import Binarizer | |
from pyspark.ml.feature import SQLTransformer | |
from pyspark.rdd import RDD | |
from pyspark.sql import Row | |
from pyspark.sql.types import * | |
from utils import _initialize_spark | |
spark, sc = _initialize_spark() | |
st.write("# :tada: Hello Pyspark DataFrame") | |
rdd = spark.sparkContext.parallelize([('1', 'a'), ('2', 'b'), ('3', 'c'), ('4', 'd'), ('5', 'e'), ('6', 'f')]) | |
schema = StructType([StructField('ID', StringType(), True), StructField('letter', StringType(), True)]) | |
df = spark.createDataFrame(rdd, schema) | |
st.write(df) | |
### INIT | |
df = pd.read_csv("titanic.csv") | |
sdf = spark.createDataFrame(df) | |
### PySpark Dataframe | |
st.write(sdf.columns) | |
st.dataframe(sdf.toPandas()) | |
st.write(sdf.select("PassengerId", "Survived", "Sex")) | |
st.write(sdf.withColumn('Name_Upper', upper(sdf.Name))) | |
st.write(sdf.groupby('Survived').count()) | |
@pandas_udf('long') | |
def pandas_plus_one(series: pd.Series) -> pd.Series: | |
# Simply plus one by using pandas Series. | |
return series + 1 | |
st.write(sdf.select(pandas_plus_one(sdf.Age))) | |
sdf.createOrReplaceTempView("titanic") | |
st.write(spark.sql('SELECT * FROM titanic WHERE Survived = 1')) | |
st.write(spark.sql("SELECT count(*) from titanic GROUP BY Survived")) | |
spark.udf.register("add_one", pandas_plus_one) | |
st.write(spark.sql("SELECT add_one(Age) FROM titanic")) | |
### pandas-on-Spark Dataframe | |
#psdf = ps.from_pandas(df) | |
psdf = sdf.pandas_api() | |
st.write(psdf["Age"]) | |
st.write(psdf[psdf["Survived"]==1].to_pandas()) | |
st.write(psdf.groupby('Survived').count()) | |
st.write(psdf.groupby('Survived')["PassengerId"].count().to_pandas()) | |
### Titanic Final | |
titanic = spark.read.format( | |
'csv' | |
).option( | |
'header', 'true' | |
).option( | |
'inferSchema', 'true' | |
).load("titanic.csv") | |
titanic.createOrReplaceTempView('titanic') | |
st.subheader("Machine Learning") | |
mean_age = spark.sql('SELECT MEAN(Age) FROM titanic').collect()[0][0] | |
titanic = titanic.na.fill(mean_age, ['Age']) | |
binarizer = Binarizer(threshold=50, inputCol="Age", outputCol="binarized_age") | |
titanic_bin = binarizer.transform(titanic) | |
st.write(titanic_bin.toPandas()) | |
regex='"(Mr)"' | |
sqlTrans = SQLTransformer( | |
statement=f"SELECT *, regexp_extract(Name, {regex}) AS Civility FROM __THIS__" | |
) | |
st.write(sqlTrans.transform(titanic).toPandas()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
from pyspark import SparkConf | |
from pyspark.sql import SparkSession | |
def _initialize_spark() -> SparkSession: | |
"""Create a Spark Session for Streamlit app""" | |
conf = SparkConf().setAppName("lecture-lyon2").setMaster("local") | |
spark = SparkSession.builder.config(conf=conf).getOrCreate() | |
return spark, spark.sparkContext |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment