Skip to content

Instantly share code, notes, and snippets.

@nuria
Last active September 24, 2020 22:20
Show Gist options
  • Save nuria/cf08ad21ba46e70a21509eb989b43429 to your computer and use it in GitHub Desktop.
Save nuria/cf08ad21ba46e70a21509eb989b43429 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.{ Encoders, SaveMode }
val readPath = s"/wmf/data/event/PrefUpdate/year=*/month=*/day=*/hour=*/*.parquet"
val propertyWhitelistFilter = s"event.property in ('skin', 'mfMode', 'mf_amc_optin', 'VectorSkinVersion', 'popups', 'popupsreferencepreviews', 'discussiontools-betaenable', 'betafeatures-auto-enroll' , 'echo-notifications-blacklist', 'email-blacklist', 'growthexperiments-help-panel-tog-help-panel', 'growthexperiments-homepage-enable', 'growthexperiments-homepage-pt-link')"
case class UserAgent(
browser_family: String,
browser_major: String,
browser_minor: String,
device_family: String,
is_bot: Option[Boolean],
is_mediawiki: Option[Boolean],
os_family: String,
os_major: String,
os_minor: String,
wmf_app_version: String
)
case class Event(
isDefault: Boolean,
property: String,
saveTimestamp: String,
userId: Option[Long],
value: String,
version: String,
bucketedUserEditCount: Option[String] = None
)
case class PrefUpdate(
dt: String,
event: Event,
recvfrom: String,
revision: Long,
schema: String,
seqid: Long,
useragent: UserAgent,
uuid: String,
webhost: String,
wiki: String,
ip: Option[String] = None,
geocoded_data: Option[Map[String, String]] = None,
topic: Option[String] = None,
year: Long,
month: Long,
day: Long,
hour: Long
) {
def sanitize(): PrefUpdate = {
PrefUpdate(
dt,
Event(
event.isDefault,
event.property,
null, // saveTimestamp
null, // userId
null, // value
event.version,
null // bucketedUserEditCount
),
recvfrom,
revision,
schema,
seqid,
UserAgent(null, null, null, null, null, null, null, null, null, null),
uuid,
webhost,
wiki,
null, // ip
Some(Map[String, String]()), // geocoded_data
topic,
year,
month,
day,
hour
)
}
}
val schema = Encoders.product[PrefUpdate].schema
spark.read.schema(schema).parquet(readPath).where(propertyWhitelistFilter).as[PrefUpdate].
map(_.sanitize).
write.
partitionBy("year", "month", "day", "hour").
mode(SaveMode.Overwrite).
saveAsTable("analytics.prefupdate_filtered")
// *** Scratch Pad ***
//
//val readPath = s"/wmf/data/event_sanitized/PrefUpdate/year=*/month=*/day=*/hour=*/*.parquet"
//
//saveAsTable("milimetric.prefupdate_sanitized_filtered")
//
val readPath = s"/wmf/data/event_sanitized/PrefUpdate/year=2017/month=11/day=30/hour=10/*.parquet"
val readPathRecent = s"/wmf/data/event/PrefUpdate/year=2020/month=8/day=1/hour=10/*.parquet"
// repartition(tempPartitions, col("wiki"), col("time_bucket")).
// sortWithinPartitions(col("wiki"), col("time_bucket"), col("timestamp")).
df.select("*", "event.*").
drop('event).
withColumn("saveTimestamp", lit(null).cast(String)).
withColumn("userId", lit(null).cast(LongType)).
withColumn("value", lit(null).cast(String)).
// TODO:
//null useragent (struct)
//null ip (String)
//empty geocoded_data (map)
withColumn("event", struct("isDefault", "property", "saveTimestamp", "userId", "value", "version")).
drop("isDefault", "property", "saveTimestamp", "userId", "value", "version").
printSchema()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment