Skip to content

Instantly share code, notes, and snippets.

View andr83's full-sized avatar

Andrei Tupitcyn andr83

View GitHub Profile
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](job.ssc, kafkaParams, topics)
stream.foreachRDD(rdd =>
import sparkSession.implicits._
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val df = rdd.toDF()
df.write.format("sequence").mode("append").save(s"$workDir/$targetFile")
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
key.asInstanceOf[String]
}
}
@andr83
andr83 / table.sql
Created February 1, 2018 22:53
How to work with Hive tables with a lot of partitions from Spark
CREATE TABLE events (
created TIMESTAMP,
user STRING,
time STRING,
request STRING,
status STRING,
size STRING,
referer STRING
) PARTITION BY (
year INT,
case class T(x: Int, y: Int)
val tOrdering = Ordering.fromLessThan[T](_.x > _.x)
var tree = scala.collection.immutable.TreeSet.empty(tOrdering)
tree = tree + T(10,2)
tree = tree + T(1,2)
tree = tree + T(1,3)
tree = tree + T(5,3)
Statistical profiling result from v8.log, (5361 ticks, 46 unaccounted, 0 excluded).
[Unknown]:
ticks total nonlib name
46 0.9%
[Shared libraries]:
ticks total nonlib name
3405 63.5% 0.0% /usr/local/bin/node
1322 24.7% 0.0% /usr/lib/system/libsystem_kernel.dylib