Skip to content

Instantly share code, notes, and snippets.

@ottomata
Created November 5, 2020 15:36
Show Gist options
  • Save ottomata/5799557cc1f1bd48440740234f607ea7 to your computer and use it in GitHub Desktop.
Save ottomata/5799557cc1f1bd48440740234f607ea7 to your computer and use it in GitHub Desktop.
// sudo -u analytics kerberos-run-command spark2-shell --files /etc/hive/conf/hive-site.xml,/etc/refinery/refine/refine_eventlogging_analytics.properties,/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar,/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar --master yarn --deploy-mode client --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar --driver-java-options='-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080'
import org.wikimedia.analytics.refinery.job.refine._
import org.wikimedia.eventutilities.core.event.{EventSchemaLoader, EventLoggingSchemaLoader}
import org.wikimedia.analytics.refinery.spark.sql.PartitionedDataFrame
import com.github.nscala_time.time.Imports._
import scala.util.matching.Regex
import org.apache.hadoop.fs.{FileSystem, Path}
import org.wikimedia.analytics.refinery.spark.sql._
import org.apache.spark.sql.types._
import org.wikimedia.analytics.refinery.spark.connectors.DataFrameToHive
case class ELSchemaNameSchemaLoader() extends SparkSchemaLoader {
val elSchemaLoader = new EventLoggingSchemaLoader()
def elToSparkSchema(schemaName: String): StructType = {
JsonSchemaConverter.toSparkSchema(elSchemaLoader.getEventLoggingSchema(schemaName))
}
def loadSchema(target: RefineTarget): Option[StructType] = {
val schemaName = target.tableName.split('.').last.replaceAll("`", "")
Some(elToSparkSchema(schemaName))
}
}
val transformFunctions: Seq[DataFrameToHive.TransformFunction] = Seq(
org.wikimedia.analytics.refinery.job.refine.filter_allowed_domains.apply,
org.wikimedia.analytics.refinery.job.refine.event_transforms.apply
)
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
val targets = RefineTarget.find(
spark,
new Path("/wmf/data/raw/eventlogging"),
new Path("/wmf/data/event"),
"event",
DateTimeFormat.forPattern("'hourly'/yyyy/MM/dd/HH"),
new Regex(
"eventlogging_(.+)/hourly/(\\d+)/(\\d+)/(\\d+)/(\\d+)",
"table", "year", "month", "day", "hour"
),
DateTime.now - 24.hours,
DateTime.now - 1.hours,
new ELSchemaNameSchemaLoader()
)
val targetsToRefine = targets.filter(_.shouldRefine(Some("(ContentTranslationAbuseFilter|NewcomerTask)".r), None, true, false))
println(s"Will refine ${targetsToRefine.size} targets")
val results = Refine.refineTargets(
spark,
targetsToRefine,
transformFunctions,
Map(),
true
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment