Skip to content

Instantly share code, notes, and snippets.

View 1ambda's full-sized avatar
๐Ÿฆ
in the jungle

Kun 1ambda

๐Ÿฆ
in the jungle
View GitHub Profile
# dfConverted.select("education").limit(5).explain("extended")
== Analyzed Logical Plan ==
education: string
GlobalLimit 5
+- LocalLimit 5
+- Project [education#238]
+- Project [id#236, year_birth#237, education#238, count_kid#239, count_teen#240, date_customer#241, days_last_login#242, add_months(to_date('date_customer, Some(d-M-yyyy)), 72) AS date_joined#257]
+- Project [ID#16 AS id#236, Year_Birth#17 AS year_birth#237, Education#18 AS education#238, Kidhome#21 AS count_kid#239, Teenhome#22 AS count_teen#240, Dt_Customer#23 AS date_customer#241, Recency#24 AS days_last_login#242]
+- Relation[ID#16,Year_Birth#17,Education#18,Marital_Status#19,Income#20,Kidhome#21,Teenhome#22,Dt_Customer#23,Recency#24,MntWines#25,MntFruits#26,MntMeatProducts#27,MntFishProducts#28,MntSweetProducts#29,MntGoldProds#30,NumDealsPurchases#31,NumWebPurchases#32,NumCatalogPurchases#33,NumStorePurchases#34,NumWebVisitsMonth#35,AcceptedCmp3#36,AcceptedCmp4#37,AcceptedCmp5#38,AcceptedCmp1#39,... 5 more fie
df.explain("cost")
df.explain("codegen")
spark.sparkContext._conf.get('spark.default.parallelism')
200 # ์ถœ๋ ฅ ๊ฒฐ๊ณผ, Spark ์„ค์ •์— ๋”ฐ๋ผ 200 ์ด ์•„๋‹Œ ๊ฐ’์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
print(f"Partition Count of Dataframe df:\t\t{df.rdd.getNumPartitions()}")
print(f"Partition Count of Dataframe dfSelected:\t{dfSelected.rdd.getNumPartitions()}")
print(f"Partition Count of Dataframe dfConverted:\t{dfConverted.rdd.getNumPartitions()}")
# ์ถœ๋ ฅ ๊ฒฐ๊ณผ
Partition Count of Dataframe df: 1
Partition Count of Dataframe dfSelected: 1
Partition Count of Dataframe dfConverted: 1
# repartition ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด ํŒŒํ‹ฐ์…˜ ์ˆซ์ž๋ฅผ 1 -> 5 ๋กœ ๋Š˜๋ฆฝ๋‹ˆ๋‹ค.
dfPartitioned = dfConverted.repartition(5)
print(f"Partition Count of Dataframe dfPartitioned:\t{dfPartitioned.rdd.getNumPartitions()}")
Partition Count of Dataframe dfPartitioned: 5
dfConverted.repartition(col("id"))
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
# DataBricks ๋กœ ์‹ค์Šตํ•œ๋‹ค๋ฉด ๊ฒฝ๋กœ๋ฅผ "/FileStore/tables/marketing_campaign.csv" ๋กœ ๋ณ€๊ฒฝํ•ฉ๋‹ˆ๋‹ค
df = spark.read.load("./marketing_campaign.csv",
format="csv",
sep="\t",
inferSchema="true",
header="true")
# 'collect()' ๋Š” Executor ์—์„œ ํŒŒ์ผ ๋‚ด์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด Driver ๋กœ ์ „์†กํ•˜๋Š” Action ์ž…๋‹ˆ๋‹ค.
# ๋งŒ์•ฝ cache() ๋“ฑ์„ ํ†ตํ•ด ์บ์‹ฑ๋˜์–ด ์žˆ๋‹ค๋ฉด ๋ฉ”๋ชจ๋ฆฌ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฐพ์•„ ๋ณด๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
collected = dfPartitioned.collect()
# type(collected) ์˜ ์‹คํ–‰ ๊ฒฐ๊ณผ
list
# collected[0] ์˜ ์‹คํ–‰ ๊ฒฐ๊ณผ
Row(id=7196, year_birth=1950, education='PhD', count_kid=1, count_teen=1, date_customer='08-02-2014', days_last_login=20, date_joined=datetime.date(2020, 2, 8))
from pyspark.sql import Row
missing_days = 10
# Spark ์˜ Row ๋Š” read-only ์ž…๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ Python ์—์„œ ๋ณ€๊ฒฝํ•˜๊ธฐ ์œ„ํ•ด Dict ๋กœ ๋ณ€๊ฒฝ ํ›„ ๋‹ค์‹œ Row ๋กœ ๋˜๋Œ๋ฆฝ๋‹ˆ๋‹ค.
# ํšจ์œจ์ ์ธ ๋ฐฉ๋ฒ•์ด ์•„๋‹ˆ๋ฉฐ, ๋‚ด๋ถ€ ๋™์ž‘์˜ ์ดํ•ด๋ฅผ ๋•๊ธฐ ์œ„ํ•ด ๋งŒ๋“  ์ฝ”๋“œ์ž…๋‹ˆ๋‹ค.
def updateDaysLastLogin(row):
parsed = row.asDict()
parsed['days_last_login'] = parsed['days_last_login'] + missing_days
spark.driver.cores # Driver ์—์„œ ์‚ฌ์šฉํ•  CPU Core ์ˆซ์ž
spark.driver.memory # Driver ์—์„œ ์‚ฌ์šฉํ•  ๋ฉ”๋ชจ๋ฆฌ GiB