This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# See also: https://phabricator.wikimedia.org/T184479#4168727 | |
# $ pyspark2 --master yarn | |
# Raw JSON event data is imported into paths like this. | |
# This will select all data for June. | |
data_path = "/wmf/data/raw/event/eqiad_mediawiki_revision-score/hourly/2018/06/*/*" | |
# The raw form is a Hadoop sequence file. Read the JSON string out of the tuple. | |
json_lines = sc.sequenceFile(data_path).map(lambda x: x[1]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
drop table MobileWikiAppiOSSessions; | |
drop table MobileWikiAppiOSUserHistory; | |
drop table MobileWikiAppiOSLoginAction; | |
drop table MobileWikiAppiOSSettingAction; | |
drop table MobileWikiAppiOSReadingLists; | |
sudo -u hdfs hdfs dfs -rm -r /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSSessions | |
sudo -u hdfs hdfs dfs -rm -r /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSUserHistory | |
sudo -u hdfs hdfs dfs -rm -r /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSLoginAction | |
sudo -u hdfs hdfs dfs -rm -r /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSSettingAction |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This checks that the two columns are compatible. Here, compatible means that: | |
* - Their names are resolvable (usually only case differences) | |
* - AND | |
* - They are either: | |
* -- The same dataType | |
* -- StructTypes, for which common sub fields are all (recursively) compatible. | |
* | |
* @param ours our schema | |
* @param theirs their schema |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################# Consumer Basics ############################# | |
# Kafka Consumer group id | |
group.id=kafka-mirror-main-eqiad_to_jumbo-eqiad | |
partition.assignment.strategy=roundrobin | |
# Zookeeper connection string | |
# comma separated host:port pairs, each corresponding to a zk | |
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata | |
from pprint import pprint | |
def get_committed_offsets(consumer, partitions): | |
return [(tp, consumer.committed(tp)) for tp in partitions] | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def f1(s1: String, s2: String, s3: String) = println(s"f1: $s1, $s2, $s3") | |
def f2(s1: String, s2: String) = println(s"f2: $s1, $s2") | |
case class Bigger( | |
s1: String = "AAAA", | |
s2: String = "AAAA", | |
s3: String = "AAAA" | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark-submit --driver-java-options='-Djsonrefine.log.level=DEBUG' --class org.wikimedia.analytics.refinery.job.refine.JsonRefine ./refinery-job/target/refinery-job-0.0.58-SNAPSHOT.jar --input-base-path /wmf/data/raw/eventlogging --database otto --output-base-path /user/otto/external/event05 --input-regex '.*eventlogging_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)' --input-capture 'table,year,month,day,hour' --table-blacklist '^Edit|ChangesListHighlights$' --ignore-failure-flag --since 2018-02-13T00:00:00 --limit 1 --transform-functions 'org.wikimedia.analytics.refinery.job.refine.deduplicate_eventlogging,org.wikimedia.analytics.refinery.job.refine.geocode_ip' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kafka import KafkaConsumer | |
import json | |
from pprint import pprint | |
# To consume latest messages and auto-commit offsets | |
consumer = KafkaConsumer( | |
'eventlogging_TestSearchSatisfaction2', | |
group_id='otto1', | |
bootstrap_servers=['kafka1012:9092'] | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
it should "convert with case classes and null" in { | |
case class BA( | |
b1: String = "b1" | |
) | |
case class BB( | |
b2: String = "b2", | |
b1: String = "b1" | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Driver stacktrace: | |
18/02/14 19:12:04 WARN ExecutorAllocationManager: No stages are running, but numRunningTasks != 0 | |
18/02/14 19:12:04 INFO DAGScheduler: Job 3 failed: insertInto at SparkJsonToHive.scala:186, took 0.537105 s | |
18/02/14 19:12:04 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 46, localhost, executor driver): TaskKilled (killed intentionally) | |
18/02/14 19:12:04 ERROR JsonRefine: Failed refinement of JSON dataset hdfs://analytics-hadoop/user/otto/el2/eventlogging_MobileWikiAppSessions/hourly/2018/02/01/01 -> MobileWikiAppSessions (year=2018,month=2,day=1,hour=1). | |
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost task 1.0 in stage 3.0 (TID 47, localhost, executor driver): scala.MatchError: null | |
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295) | |
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeCo |