Last active
October 5, 2019 20:55
-
-
Save asm0dey/068fb783eaf4c21cd3b14df7ee5d99ee to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@JvmField | |
val ENCODERS = mapOf<KClass<out Any>, Encoder<out Any>>( | |
Boolean::class to Encoders.BOOLEAN(), | |
Byte::class to Encoders.BYTE(), | |
Short::class to Encoders.SHORT(), | |
Int::class to Encoders.INT(), | |
Long::class to Encoders.LONG(), | |
Float::class to Encoders.FLOAT(), | |
Double::class to Encoders.DOUBLE(), | |
String::class to Encoders.STRING(), | |
BigDecimal::class to Encoders.DECIMAL(), | |
Date::class to Encoders.DATE(), | |
Timestamp::class to Encoders.TIMESTAMP(), | |
ByteArray::class to Encoders.BINARY() | |
) | |
inline fun <reified T> encoder(): Encoder<T>? = ENCODERS[T::class] as? Encoder<T>? ?: Encoders.kryo(T::class.java) | |
inline fun <reified T> SparkSession.toDS(list: List<T>): Dataset<T> = createDataset(list, encoder<T>()) | |
inline fun <T, reified R> Dataset<T>.map(crossinline func: (T) -> R): Dataset<R> = | |
map(MapFunction { func(it) }, encoder<R>()) | |
inline fun <T, reified R> Dataset<T>.flatMap(noinline func: (T) -> Iterator<R>): Dataset<R> = | |
flatMap(func, encoder<R>()) | |
inline fun <T, reified R> Dataset<T>.groupByKey(noinline func: (T) -> R): KeyValueGroupedDataset<R, T> = | |
groupByKey(MapFunction { func(it) }, encoder<R>()) | |
inline fun <T, reified R> Dataset<T>.mapPartitions(noinline func: (Iterator<T>) -> Iterator<R>): Dataset<R> = | |
mapPartitions(func, encoder<R>()) | |
inline fun <KEY, VALUE, reified R> KeyValueGroupedDataset<KEY, VALUE>.mapValues(crossinline func: (VALUE) -> R) = | |
mapValues(MapFunction { func(it) }, encoder<R>()) | |
inline fun <KEY, VALUE, reified R> KeyValueGroupedDataset<KEY, VALUE>.mapGroups(crossinline func: (KEY, Iterator<VALUE>) -> R) = | |
mapGroups(MapGroupsFunction { a, b -> func(a, b) }, encoder<R>()) | |
inline fun <reified R> Dataset<Row>.cast(): Dataset<R> = `as`(encoder<R>()) | |
fun <T, R> Iterator<T>.map(func: (T) -> R): Iterator<R> { | |
val self = this | |
return object : AbstractIterator<R>() { | |
override fun computeNext() { | |
while (self.hasNext()) { | |
setNext(func(self.next())) | |
} | |
done() | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fun main() { | |
val spark = SparkSession.builder().enableHiveSupport().getOrCreate() | |
spark | |
.toDS(listOf(1, 2, 3)) | |
.map { Pair(it, it + 1) } | |
val z = spark | |
.read() | |
.csv("") | |
.cast<Pair<String, Int>>() | |
.mapPartitions { it.map { pair -> pair.first } } | |
.flatMap { it.chars().iterator() } | |
.map { it.toString() to it } | |
.groupByKey { it.first } | |
.mapGroups { a, b -> "$a$b" to b } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment