Skip to content

Instantly share code, notes, and snippets.

View 1ambda's full-sized avatar
🦁
in the jungle

Kun 1ambda

🦁
in the jungle
View GitHub Profile
# df.rdd.id() 실행 결과
<bound method RDD.id of MapPartitionsRDD[25] at javaToPython at NativeMethodAccessorImpl.java:0>
# dfSelected.rdd.id() 실행 결과
<bound method RDD.id of MapPartitionsRDD[31] at javaToPython at NativeMethodAccessorImpl.java:0>
dfSelected.printSchema() # 스키마를 확인합니다.
dfSelected.describe().show() # 통계 정보를 확인합니다. PySpark 에서는 `show` 대신 `toPandas` 를 활용할 수 있습니다.
# printSchema() 의 출력 결과
root
|-- id: integer (nullable = true)
|-- year_birth: integer (nullable = true)
|-- education: string (nullable = true)
|-- count_kid: integer (nullable = true)
|-- count_teen: integer (nullable = true)
dfConverted1 = dfSelected\
withColumn("count_children", coalesce("count_kid", lit(0)) + coalesce("count_teen", lit(0)))
dfConverted1\
.select(col("id"), col("count_kid"), col("count_teen"), col("count_children"))\
.limit(5)\
.show()
# `show() 출력 결과
+----+---------+----------+--------------+
# 이 작업에서는 컬럼 이름을 가공하지 않으므로, `select` 내에서 `col` 함수를 사용하지 않았습니다.
# Spark 는 이와 같이 API 에서 다양한 형태로 사용자의 편의성을 지원합니다.
dfConverted1\
.select("education")\
.distinct()\
.show()
# `show()` 출력 결과
+----------+
| education|
educationInvalid = '2n Cycle'
educationDefault = 'NONE'
# 다음 SQL 구문과 동일합니다.
#
# SELECT CASE WHEN education = '2n Cycle' THEN 'NONE' ELSE education as education
#
dfConverted2 = dfConverted1.withColumn(
"education",
dfConverted3 = dfConverted1\
.selectExpr("*",
f"CASE WHEN education == '{educationInvalid}' THEN '{educationDefault}' ELSE education END as education"
)
# 기존 date_customer 컬럼의 값과 비교를 위해 `date_joined` 라는 다른 이름으로 컬럼 값 변환 결과를 저장합니다
# 1. 이 과정에서 `to_date` 함수를 사용해 타입을 변경하고
# 2. `add_months` 함수를 통해 72개월 (= 6년) 을 기존 값에 추가했습니다.
dfWithJoined = dfConverted2.withColumn("date_joined", add_months(to_date(col("date_customer"), "d-M-yyyy"), 72))
dfWithJoined.select("date_customer", "date_joined").limit(5).show()
dfWithJoined.printSchema()
# `show()` 출력 결과
{
"device_id": 198164,
"device_name": "sensor-pad-198164owomcJZ",
"ip": "80.55.20.25",
"cca2": "PL",
"cca3": "POL",
"cn": "Poland",
"latitude": 53.08,
"longitude": 18.62,
"scale": "Celsius",
case class DeviceIoTData (
battery_level: Long,
c02_level: Long,
cca2: String,
cca3: String,
cn: String,
device_id: Long,
device_name: String,
humidity: Long,
ip: String,
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
// 만약 Databricks 노트북을 사용한다면 경로를
// `/FileStore/tables/marketing_campaign.csv"` 로 변경할 수 있습니다.
val df = spark.read
.format("csv")
.option("format", "csv")
.option("sep", "\t")
.option("inferSchema", "true")