-
-
Save korkridake/07eb243bd3b6b6ae8e23f30c4b507fb0 to your computer and use it in GitHub Desktop.
print(sc) | |
print(spark) | |
# <pyspark.sql.session.SparkSession at 0x7f8df8673ba8> | |
# ------------------------------------------------------------------------------- | |
# Import PySpark Libraries | |
# ------------------------------------------------------------------------------- | |
import math | |
import re | |
import pandas as pd | |
import numpy as np | |
import datetime | |
from pyspark.sql.functions import skewness, kurtosis, var_pop, var_samp, stddev, stddev_pop, sumDistinct, ntile, udf, col, desc | |
from pyspark.sql.functions import split, explode, substring, upper, trim, lit, length, regexp_replace, col, when, desc, concat, coalesce, countDistinct, expr | |
#'udf' stands for 'user defined function', and is simply a wrapper for functions you write and | |
#want to apply to a column that knows how to iterate through pySpark dataframe columns. it should | |
#be more clear after we use it below | |
from pyspark.sql.types import IntegerType, StringType, DateType | |
from pyspark.sql.types import StructField, StringType, StructType | |
from pyspark.sql import DataFrame, Row | |
from functools import reduce | |
# <pyspark.sql.session.SparkSession object at 0x7f7e362177f0> | |
# ------------------------------------------------------------------------------- | |
# ------------------------------------------------------------------------------- | |
# ------------------------------------------------------------------------------- | |
# Please refer to: http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html for full reference. | |
# ------------------------------------------------------------------------------- | |
# ------------------------------------------------------------------------------- | |
# ------------------------------------------------------------------------------- | |
# ------------------------------------------------------------------------------- | |
# How to select columns? | |
# ------------------------------------------------------------------------------- | |
# Assume that you have pre-loaded the dataframe called 'df' | |
# Please replace 'COL1' and 'COL2' | |
df.select(['COL1', 'COL2']) | |
# ------------------------------------------------------------------------------- | |
# How to trim the whitespaces? | |
# How to uppercase the elements in the feature? | |
# ------------------------------------------------------------------------------- | |
df.withColumn('COL1', trim(upper(df['COL1']))) | |
# ------------------------------------------------------------------------------- | |
# How to filter for non-null rows in the feature? | |
# ------------------------------------------------------------------------------- | |
df.where(df['COL1'] != 'NaN') | |
# ------------------------------------------------------------------------------- | |
# How to drop missing rows from the dataframe? | |
# dropna(): returns a new DataFrame omitting rows with null values. | |
# ------------------------------------------------------------------------------- | |
df.dropna() | |
# ------------------------------------------------------------------------------- | |
# split: splits str around pattern (pattern is a regular expression). In this case, by ',' | |
# explode: returns a new row for each element in the given array or map. | |
# toDF: returns a new class:DataFrame that with new specified column names | |
# ------------------------------------------------------------------------------- | |
df.select(df['COL1'], explode(split(df['COL2'], ';'))).toDF(*['COL1', 'COL2']) | |
# ------------------------------------------------------------------------------- | |
# lit: creates a Column of literal value. | |
# ------------------------------------------------------------------------------- | |
df.withColumn('COL1', lit(1)) | |
# ------------------------------------------------------------------------------- | |
# Returns a new DataFrame omitting rows with null values. DataFrame.dropna() and | |
# DataFrameNaFunctions.drop() are aliases of each other. | |
# Parameters: | |
# how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null. | |
# thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter. | |
# subset – optional list of column names to consider. | |
# ------------------------------------------------------------------------------- | |
df.dropna(subset=['COL1'], how='any', thresh=1) | |
df.dropna(subset =['COL1', 'COL2'] , how = 'all' , inplace = True) | |
# ------------------------------------------------------------------------------- | |
# Return df column names and data types | |
# ------------------------------------------------------------------------------- | |
df.dtypes | |
# ------------------------------------------------------------------------------- | |
# Return the first n rows (return the first 2 rows in the below example) | |
# ------------------------------------------------------------------------------- | |
df.take(2) | |
# -------------------------------------------------------------------------- | |
# -------------------------------------------------------------------------- | |
# Choose all columns except prospect_id and cast data type to 'float' | |
# -------------------------------------------------------------------------- | |
# -------------------------------------------------------------------------- | |
### Method 1 | |
### Source: https://stackoverflow.com/questions/40478018/pyspark-dataframe-convert-multiple-columns-to-float | |
### -------------------------------------------------------------------------- | |
### cols = df.columns[1:] | |
for col_name in cols: | |
print('cast data type to float for: ', col_name) | |
df = df.withColumn(col_name, col(col_name).cast('float')) | |
### -------------------------------------------------------------------------- | |
### Method 2 | |
### Source: https://stackoverflow.com/questions/40478018/pyspark-dataframe-convert-multiple-columns-to-float | |
### -------------------------------------------------------------------------- | |
df = df.select(*(col(c).cast("float").alias(c) for c in df.columns)) # cast to float for all columns! | |
df = df.withColumn('COL1', df['COL1'].astype('string')) # cast to float for a single column! | |
# ------------------------------------------------------------------------------- | |
### Apply the function to every row in COL1 | |
### The strip() method removes any whitespace from the beginning or the end | |
### The upper() method returns the string in upper case | |
# ------------------------------------------------------------------------------- | |
df['COL1'] = df['COL1'].apply(lambda x: x.upper().strip()) | |
# ------------------------------------------------------------------------------- | |
### Drop duplicates in COL1, COL2, and COL3 | |
# ------------------------------------------------------------------------------- | |
df = df.drop_duplicates(['COL1', 'COL2', 'COL3']) | |
# ------------------------------------------------------------------------------- | |
### Create another column by mutating COL1 and COL2 | |
# ------------------------------------------------------------------------------- | |
df = df.withColumn('NEW_COL', concat(coalesce(col('COL1'), lit('')), lit('_'), coalesce(col('COL2'), lit('')))) | |
# ------------------------------------------------------------------------------- | |
### Create another column by doing the regular expression on COL1 | |
# ------------------------------------------------------------------------------- | |
df = df.withColumn('NEW_COL', regexp_replace(df['COL1'],'REG_PATTERN','')) # For example, the pattern is [^A-Za-z0-9]+ | |
# ------------------------------------------------------------------------------- | |
### Joining in PySpark (by 4 Columns) | |
# ------------------------------------------------------------------------------- | |
df1.join(df2, [col('COL1_df1') == col('COL1_df2'), | |
col('COL2_df1')==col('COL2_df2'), | |
col('COL3_df1')==col('COL3_df2'), | |
col('COL4_df1')==col('COL4_df2')], | |
how='inner') | |
# ------------------------------------------------------------------------------- | |
### Joining in PySpark (by a single column) | |
# ------------------------------------------------------------------------------- | |
df1.join(df2, [col('COL1_df1') == col('COL1_df2')], how='inner') | |
# ------------------------------------------------------------------------------- | |
### Ifelse in PySpark | |
#### If the condition is satisfied, then 1, otherwise 0 | |
# ------------------------------------------------------------------------------- | |
df = df.withColumn('NEW_COL', when(col('COL1') < col('COL2'), 1).otherwise(0)) | |
# ------------------------------------------------------------------------------- | |
### Filter in PySpark | |
### In this example, filter only row with COL1 > 29 | |
# ------------------------------------------------------------------------------- | |
df = df.filter(df['COL1'] > 29) | |
# ------------------------------------------------------------------------------- | |
### Data Dimensionality in PySpark | |
# ------------------------------------------------------------------------------- | |
print('-' * 150) | |
print('the number of rows: ',df.count()) | |
print('the number of columns: ', len(df.columns)) | |
print('-' * 150) | |
# ------------------------------------------------------------------------------- | |
### Number of Unique Rows in COL1 | |
# ------------------------------------------------------------------------------- | |
df.select('COL1').distinct().count()) |
How to show full column content in a Spark Dataframe?
https://stackoverflow.com/questions/33742895/how-to-show-full-column-content-in-a-spark-dataframe
results.show(20, False) will not truncate.
# -------------------------------------------------------------------------------
# Convert the list into the SparkDataFrame
# -------------------------------------------------------------------------------
list_of_column_names_spark = spark.createDataFrame(list_of_column_names, StringType())
list_of_column_names_spark = list_of_column_names_spark .selectExpr("value as col_name")
display(list_of_column_names_spark )
How to rename in PySpark?
https://stackoverflow.com/questions/34077353/how-to-change-dataframe-column-names-in-pyspark
Pyspark convert a standard list to data frame
https://stackoverflow.com/questions/48448473/pyspark-convert-a-standard-list-to-data-frame/48452100
Hi.. is there any way to add 'double quotes' to a string from a dataframe- basically to make a valid string
+------------------------+---------+-------------------------------------------------------------------------------------------+
|time |IP |JSON |
+------------------------+---------+-------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}|
Here which function can use to convert "Keys" value(9serverkey) to proper json string.. ie it should convert to
"Keys":"-9serverkey"
i tried this way
df.withColumn("JSON", F.regexp_replace(F.col("JSON"), r'"Keys":([-][^,]+)', '"Keys":"$1"'))
but its not converting to the string..
Appreciated if you can let us which function can apply for this?
Thanks
Please refer to: http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html for full reference.