Skip to content

Instantly share code, notes, and snippets.

@Igosuki
Last active August 16, 2017 11:06
Show Gist options
  • Save Igosuki/6a79e8b8a182dd1eb64a4bf255308061 to your computer and use it in GitHub Desktop.
Save Igosuki/6a79e8b8a182dd1eb64a4bf255308061 to your computer and use it in GitHub Desktop.
Buggy reduce in scio

sbt run --input=stuff.log

import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
import com.fasterxml.jackson.annotation.PropertyAccessor
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.spotify.scio.ContextAndArgs
import com.spotify.scio.values.WindowOptions
import org.apache.beam.sdk.io.TextIO.CompressionType
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode
import org.joda.time.Duration
object LateReduceExample {
case class Stuff(val name: String, val value: Int)
case class StuffKey(val name: String)
def mapper = {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
mapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY)
mapper
}
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val windowDuration = Duration.standardSeconds(args.int("statsAggWindowDuration", 10))
val allowedLateness = Duration.standardSeconds(args.int("allowedLateness", 1))
val logFeed = sc.textFile(args("input"), CompressionType.UNCOMPRESSED)
.map { l => mapper.readValue[Stuff](l) } //TMEvents
.withFixedWindows(
windowDuration,
options = WindowOptions(
trigger = AfterProcessingTime.pastFirstElementInPane(),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
allowedLateness = allowedLateness // Any allowed lateness fires empty windows and makes the reducers bug in scio
)
)
.map { s => (StuffKey(s.name), s.value) }
.reduceByKey(_ + _)
val result = sc.close()
result.waitUntilDone()
}
}
{"name": "a", "value": 1}
{"name": "b", "value": 1}
{"name": "a", "value": 3}
{"name": "c", "value": 2}
{"name": "b", "value": 5}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment