Skip to content

Instantly share code, notes, and snippets.

View ottomata's full-sized avatar

Andrew Otto ottomata

View GitHub Profile
# See also: https://phabricator.wikimedia.org/T184479#4168727
# $ pyspark2 --master yarn
# Raw JSON event data is imported into paths like this.
# This will select all data for June.
data_path = "/wmf/data/raw/event/eqiad_mediawiki_revision-score/hourly/2018/06/*/*"
# The raw form is a Hadoop sequence file. Read the JSON string out of the tuple.
json_lines = sc.sequenceFile(data_path).map(lambda x: x[1])
drop table MobileWikiAppiOSSessions;
drop table MobileWikiAppiOSUserHistory;
drop table MobileWikiAppiOSLoginAction;
drop table MobileWikiAppiOSSettingAction;
drop table MobileWikiAppiOSReadingLists;
sudo -u hdfs hdfs dfs -rm -r /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSSessions
sudo -u hdfs hdfs dfs -rm -r /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSUserHistory
sudo -u hdfs hdfs dfs -rm -r /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSLoginAction
sudo -u hdfs hdfs dfs -rm -r /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSSettingAction
/**
* This checks that the two columns are compatible. Here, compatible means that:
* - Their names are resolvable (usually only case differences)
* - AND
* - They are either:
* -- The same dataType
* -- StructTypes, for which common sub fields are all (recursively) compatible.
*
* @param ours our schema
* @param theirs their schema
@ottomata
ottomata / consumer.properties
Last active March 12, 2018 18:53
MirrorMaker 0.9.0.1 -> Kafka 1.0 Cluster woes
############################# Consumer Basics #############################
# Kafka Consumer group id
group.id=kafka-mirror-main-eqiad_to_jumbo-eqiad
partition.assignment.strategy=roundrobin
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
import json
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from pprint import pprint
def get_committed_offsets(consumer, partitions):
return [(tp, consumer.committed(tp)) for tp in partitions]
def f1(s1: String, s2: String, s3: String) = println(s"f1: $s1, $s2, $s3")
def f2(s1: String, s2: String) = println(s"f2: $s1, $s2")
case class Bigger(
s1: String = "AAAA",
s2: String = "AAAA",
s3: String = "AAAA"
)
spark-submit --driver-java-options='-Djsonrefine.log.level=DEBUG' --class org.wikimedia.analytics.refinery.job.refine.JsonRefine ./refinery-job/target/refinery-job-0.0.58-SNAPSHOT.jar --input-base-path /wmf/data/raw/eventlogging --database otto --output-base-path /user/otto/external/event05 --input-regex '.*eventlogging_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)' --input-capture 'table,year,month,day,hour' --table-blacklist '^Edit|ChangesListHighlights$' --ignore-failure-flag --since 2018-02-13T00:00:00 --limit 1 --transform-functions 'org.wikimedia.analytics.refinery.job.refine.deduplicate_eventlogging,org.wikimedia.analytics.refinery.job.refine.geocode_ip'
from kafka import KafkaConsumer
import json
from pprint import pprint
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(
'eventlogging_TestSearchSatisfaction2',
group_id='otto1',
bootstrap_servers=['kafka1012:9092']
)
it should "convert with case classes and null" in {
case class BA(
b1: String = "b1"
)
case class BB(
b2: String = "b2",
b1: String = "b1"
)
Driver stacktrace:
18/02/14 19:12:04 WARN ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
18/02/14 19:12:04 INFO DAGScheduler: Job 3 failed: insertInto at SparkJsonToHive.scala:186, took 0.537105 s
18/02/14 19:12:04 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 46, localhost, executor driver): TaskKilled (killed intentionally)
18/02/14 19:12:04 ERROR JsonRefine: Failed refinement of JSON dataset hdfs://analytics-hadoop/user/otto/el2/eventlogging_MobileWikiAppSessions/hourly/2018/02/01/01 -> MobileWikiAppSessions (year=2018,month=2,day=1,hour=1).
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost task 1.0 in stage 3.0 (TID 47, localhost, executor driver): scala.MatchError: null
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeCo