-
-
Save korkridake/07eb243bd3b6b6ae8e23f30c4b507fb0 to your computer and use it in GitHub Desktop.
print(sc) | |
print(spark) | |
# <pyspark.sql.session.SparkSession at 0x7f8df8673ba8> | |
# ------------------------------------------------------------------------------- | |
# Import PySpark Libraries | |
# ------------------------------------------------------------------------------- | |
import math | |
import re | |
import pandas as pd | |
import numpy as np | |
import datetime | |
from pyspark.sql.functions import skewness, kurtosis, var_pop, var_samp, stddev, stddev_pop, sumDistinct, ntile, udf, col, desc | |
from pyspark.sql.functions import split, explode, substring, upper, trim, lit, length, regexp_replace, col, when, desc, concat, coalesce, countDistinct, expr | |
#'udf' stands for 'user defined function', and is simply a wrapper for functions you write and | |
#want to apply to a column that knows how to iterate through pySpark dataframe columns. it should | |
#be more clear after we use it below | |
from pyspark.sql.types import IntegerType, StringType, DateType | |
from pyspark.sql.types import StructField, StringType, StructType | |
from pyspark.sql import DataFrame, Row | |
from functools import reduce | |
# <pyspark.sql.session.SparkSession object at 0x7f7e362177f0> | |
# ------------------------------------------------------------------------------- | |
# ------------------------------------------------------------------------------- | |
# ------------------------------------------------------------------------------- | |
# Please refer to: http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html for full reference. | |
# ------------------------------------------------------------------------------- | |
# ------------------------------------------------------------------------------- | |
# ------------------------------------------------------------------------------- | |
# ------------------------------------------------------------------------------- | |
# How to select columns? | |
# ------------------------------------------------------------------------------- | |
# Assume that you have pre-loaded the dataframe called 'df' | |
# Please replace 'COL1' and 'COL2' | |
df.select(['COL1', 'COL2']) | |
# ------------------------------------------------------------------------------- | |
# How to trim the whitespaces? | |
# How to uppercase the elements in the feature? | |
# ------------------------------------------------------------------------------- | |
df.withColumn('COL1', trim(upper(df['COL1']))) | |
# ------------------------------------------------------------------------------- | |
# How to filter for non-null rows in the feature? | |
# ------------------------------------------------------------------------------- | |
df.where(df['COL1'] != 'NaN') | |
# ------------------------------------------------------------------------------- | |
# How to drop missing rows from the dataframe? | |
# dropna(): returns a new DataFrame omitting rows with null values. | |
# ------------------------------------------------------------------------------- | |
df.dropna() | |
# ------------------------------------------------------------------------------- | |
# split: splits str around pattern (pattern is a regular expression). In this case, by ',' | |
# explode: returns a new row for each element in the given array or map. | |
# toDF: returns a new class:DataFrame that with new specified column names | |
# ------------------------------------------------------------------------------- | |
df.select(df['COL1'], explode(split(df['COL2'], ';'))).toDF(*['COL1', 'COL2']) | |
# ------------------------------------------------------------------------------- | |
# lit: creates a Column of literal value. | |
# ------------------------------------------------------------------------------- | |
df.withColumn('COL1', lit(1)) | |
# ------------------------------------------------------------------------------- | |
# Returns a new DataFrame omitting rows with null values. DataFrame.dropna() and | |
# DataFrameNaFunctions.drop() are aliases of each other. | |
# Parameters: | |
# how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null. | |
# thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter. | |
# subset – optional list of column names to consider. | |
# ------------------------------------------------------------------------------- | |
df.dropna(subset=['COL1'], how='any', thresh=1) | |
df.dropna(subset =['COL1', 'COL2'] , how = 'all' , inplace = True) | |
# ------------------------------------------------------------------------------- | |
# Return df column names and data types | |
# ------------------------------------------------------------------------------- | |
df.dtypes | |
# ------------------------------------------------------------------------------- | |
# Return the first n rows (return the first 2 rows in the below example) | |
# ------------------------------------------------------------------------------- | |
df.take(2) | |
# -------------------------------------------------------------------------- | |
# -------------------------------------------------------------------------- | |
# Choose all columns except prospect_id and cast data type to 'float' | |
# -------------------------------------------------------------------------- | |
# -------------------------------------------------------------------------- | |
### Method 1 | |
### Source: https://stackoverflow.com/questions/40478018/pyspark-dataframe-convert-multiple-columns-to-float | |
### -------------------------------------------------------------------------- | |
### cols = df.columns[1:] | |
for col_name in cols: | |
print('cast data type to float for: ', col_name) | |
df = df.withColumn(col_name, col(col_name).cast('float')) | |
### -------------------------------------------------------------------------- | |
### Method 2 | |
### Source: https://stackoverflow.com/questions/40478018/pyspark-dataframe-convert-multiple-columns-to-float | |
### -------------------------------------------------------------------------- | |
df = df.select(*(col(c).cast("float").alias(c) for c in df.columns)) # cast to float for all columns! | |
df = df.withColumn('COL1', df['COL1'].astype('string')) # cast to float for a single column! | |
# ------------------------------------------------------------------------------- | |
### Apply the function to every row in COL1 | |
### The strip() method removes any whitespace from the beginning or the end | |
### The upper() method returns the string in upper case | |
# ------------------------------------------------------------------------------- | |
df['COL1'] = df['COL1'].apply(lambda x: x.upper().strip()) | |
# ------------------------------------------------------------------------------- | |
### Drop duplicates in COL1, COL2, and COL3 | |
# ------------------------------------------------------------------------------- | |
df = df.drop_duplicates(['COL1', 'COL2', 'COL3']) | |
# ------------------------------------------------------------------------------- | |
### Create another column by mutating COL1 and COL2 | |
# ------------------------------------------------------------------------------- | |
df = df.withColumn('NEW_COL', concat(coalesce(col('COL1'), lit('')), lit('_'), coalesce(col('COL2'), lit('')))) | |
# ------------------------------------------------------------------------------- | |
### Create another column by doing the regular expression on COL1 | |
# ------------------------------------------------------------------------------- | |
df = df.withColumn('NEW_COL', regexp_replace(df['COL1'],'REG_PATTERN','')) # For example, the pattern is [^A-Za-z0-9]+ | |
# ------------------------------------------------------------------------------- | |
### Joining in PySpark (by 4 Columns) | |
# ------------------------------------------------------------------------------- | |
df1.join(df2, [col('COL1_df1') == col('COL1_df2'), | |
col('COL2_df1')==col('COL2_df2'), | |
col('COL3_df1')==col('COL3_df2'), | |
col('COL4_df1')==col('COL4_df2')], | |
how='inner') | |
# ------------------------------------------------------------------------------- | |
### Joining in PySpark (by a single column) | |
# ------------------------------------------------------------------------------- | |
df1.join(df2, [col('COL1_df1') == col('COL1_df2')], how='inner') | |
# ------------------------------------------------------------------------------- | |
### Ifelse in PySpark | |
#### If the condition is satisfied, then 1, otherwise 0 | |
# ------------------------------------------------------------------------------- | |
df = df.withColumn('NEW_COL', when(col('COL1') < col('COL2'), 1).otherwise(0)) | |
# ------------------------------------------------------------------------------- | |
### Filter in PySpark | |
### In this example, filter only row with COL1 > 29 | |
# ------------------------------------------------------------------------------- | |
df = df.filter(df['COL1'] > 29) | |
# ------------------------------------------------------------------------------- | |
### Data Dimensionality in PySpark | |
# ------------------------------------------------------------------------------- | |
print('-' * 150) | |
print('the number of rows: ',df.count()) | |
print('the number of columns: ', len(df.columns)) | |
print('-' * 150) | |
# ------------------------------------------------------------------------------- | |
### Number of Unique Rows in COL1 | |
# ------------------------------------------------------------------------------- | |
df.select('COL1').distinct().count()) |
Disclosure statement: [NAME] does not work or receive funding from any company or organization that would benefit from this article. Views expressed here are personal and not supported by university or company.
Please refer to: http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html for full reference.
How to show full column content in a Spark Dataframe?
https://stackoverflow.com/questions/33742895/how-to-show-full-column-content-in-a-spark-dataframe
results.show(20, False) will not truncate.
# -------------------------------------------------------------------------------
# Convert the list into the SparkDataFrame
# -------------------------------------------------------------------------------
list_of_column_names_spark = spark.createDataFrame(list_of_column_names, StringType())
list_of_column_names_spark = list_of_column_names_spark .selectExpr("value as col_name")
display(list_of_column_names_spark )
How to rename in PySpark?
https://stackoverflow.com/questions/34077353/how-to-change-dataframe-column-names-in-pyspark
Pyspark convert a standard list to data frame
https://stackoverflow.com/questions/48448473/pyspark-convert-a-standard-list-to-data-frame/48452100
Hi.. is there any way to add 'double quotes' to a string from a dataframe- basically to make a valid string
+------------------------+---------+-------------------------------------------------------------------------------------------+
|time |IP |JSON |
+------------------------+---------+-------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}|
Here which function can use to convert "Keys" value(9serverkey) to proper json string.. ie it should convert to
"Keys":"-9serverkey"
i tried this way
df.withColumn("JSON", F.regexp_replace(F.col("JSON"), r'"Keys":([-][^,]+)', '"Keys":"$1"'))
but its not converting to the string..
Appreciated if you can let us which function can apply for this?
Thanks
Further resources are patched under the comment. Please see them for more information