Created
December 3, 2022 00:56
-
-
Save laysakura/175cf1137c8cb3356236ed7ab2443b71 to your computer and use it in GitHub Desktop.
Apache Beam feature request: `IntervalWindow`'s [`start`, `end`) for merged windows (e.g. Sessions)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| {"timestamp": "2022-08-01T00:00:00.000+00:00", "temperature [°C]": 0.0, "rainfall [mm]": 0.0} | |
| {"timestamp": "2022-08-01T01:00:00.000+00:00", "temperature [°C]": 0.0, "rainfall [mm]": 0.0} | |
| {"timestamp": "2022-08-01T02:00:00.000+00:00", "temperature [°C]": 0.0, "rainfall [mm]": 0.0} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.apache.beam.sp_handson.sec03.p03; | |
| import org.apache.beam.sdk.Pipeline; | |
| import org.apache.beam.sdk.io.kafka.KafkaIO; | |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
| import org.apache.beam.sdk.transforms.Count; | |
| import org.apache.beam.sdk.transforms.DoFn; | |
| import org.apache.beam.sdk.transforms.MapElements; | |
| import org.apache.beam.sdk.transforms.ParDo; | |
| import org.apache.beam.sdk.transforms.windowing.IntervalWindow; | |
| import org.apache.beam.sdk.transforms.windowing.Sessions; | |
| import org.apache.beam.sdk.transforms.windowing.Window; | |
| import org.apache.beam.sdk.values.KV; | |
| import org.apache.beam.sdk.values.PCollection; | |
| import org.apache.beam.sdk.values.TypeDescriptors; | |
| import org.apache.beam.sp_handson.sec02.p02.Weather; | |
| import org.apache.beam.sp_handson.sec03.p01.WeatherTimestampPolicyFactory; | |
| import org.apache.kafka.common.serialization.LongDeserializer; | |
| import org.apache.kafka.common.serialization.StringDeserializer; | |
| import org.apache.kafka.common.serialization.StringSerializer; | |
| import org.joda.time.DateTimeZone; | |
| import org.joda.time.Duration; | |
| import org.joda.time.Instant; | |
| import com.fasterxml.jackson.core.JsonProcessingException; | |
| import com.fasterxml.jackson.databind.ObjectMapper; | |
| // 30℃以上の気温が継続した日数をカウントするパイプライン | |
| public class HeatWavePipeline { | |
| // アメダス観測のJSON文字列をWeatherクラスにマッピングするため | |
| private static final ObjectMapper objectMapper = new ObjectMapper(); | |
| public static void main(String[] args) { | |
| Pipeline p = Pipeline.create( | |
| PipelineOptionsFactory.fromArgs(args).withValidation().create()); | |
| // Kafkaソース入力 | |
| PCollection<KV<Long, String>> kafkaInput = p.apply( | |
| KafkaIO.<Long, String>read() | |
| .withBootstrapServers("localhost:9092") | |
| // 東京用のトピックを使用 | |
| .withTopic("weather-tokyo") | |
| .withKeyDeserializer(LongDeserializer.class) | |
| .withValueDeserializer(StringDeserializer.class) | |
| .withTimestampPolicyFactory(new WeatherTimestampPolicyFactory<>()) | |
| .withReadCommitted() | |
| .withoutMetadata()); | |
| // KV<Long, String> のバリュー部分をJSONとしてパースし、Weatherクラスにマッピング | |
| PCollection<Weather> weather = kafkaInput.apply( | |
| ParDo.of(new DoFn<KV<Long, String>, Weather>() { | |
| @ProcessElement | |
| public void processElement(@Element KV<Long, String> rawWeather, | |
| OutputReceiver<Weather> out) | |
| throws JsonProcessingException { | |
| String jsonWeather = rawWeather.getValue(); | |
| Weather weather = objectMapper.readValue(jsonWeather, Weather.class); | |
| out.output(weather); | |
| } | |
| })); | |
| // 気象情報から気温だけを抽出 | |
| PCollection<Float> temperature = weather.apply( | |
| MapElements.into(TypeDescriptors.floats()) | |
| .via(w -> w.temperatureC)); | |
| // 30℃以上の気温だけをフィルタリング | |
| PCollection<Float> temperatureOver30 = temperature.apply( | |
| ParDo.of(new DoFn<Float, Float>() { | |
| @ProcessElement | |
| public void processElement( | |
| @Element Float t, | |
| OutputReceiver<Float> out) { | |
| if (t >= 30.0) { | |
| out.output(t); | |
| } | |
| } | |
| })); | |
| // Event time-basedなセッションウィンドウ(セッション持続時間 = 1日)を構築 | |
| PCollection<Float> windowedTemperature = temperatureOver30.apply( | |
| Window.<Float>into( | |
| Sessions.withGapDuration(Duration.standardDays(1)))); | |
| // ウィンドウ情報からウィンドウ開始点を取得 | |
| PCollection<KV<Instant, Float>> windowedTemperatureWithDate = windowedTemperature.apply( | |
| ParDo.of(new DoFn<Float, KV<Instant, Float>>() { | |
| @ProcessElement | |
| public void processElement( | |
| @Element Float temperature, | |
| IntervalWindow window, | |
| OutputReceiver<KV<Instant, Float>> out) { | |
| Instant winEnd = window.maxTimestamp(); | |
| System.out.println(window); | |
| out.output(KV.of(winEnd, temperature)); | |
| } | |
| })); | |
| // ウィンドウ開始点毎(セッションウィンドウ毎)に、 | |
| // 各ウィンドウでのイベントの個数(30℃以上が継続した日数)をカウント | |
| PCollection<KV<Instant, Long>> eventCounts = windowedTemperatureWithDate.apply( | |
| Count.<Instant, Float>perKey()); | |
| // フォーマットして文字列化 | |
| PCollection<String> meanTemperatureLine = eventCounts.apply( | |
| MapElements | |
| .into(TypeDescriptors.strings()) | |
| .via(cnt -> "leftmost datetime:" | |
| // 日本時間での日時 | |
| + cnt.getKey().toDateTime(DateTimeZone.forID("+09:00")).toString() | |
| + "\tcount:" | |
| + cnt)); | |
| // Kafkaシンク出力 | |
| meanTemperatureLine.apply( | |
| KafkaIO.<Void, String>write() | |
| .withBootstrapServers("localhost:9092") | |
| .withTopic("beam-out") | |
| .withValueSerializer(StringSerializer.class) | |
| .values()); | |
| p.run(); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.apache.beam.sp_handson.sec02.p02; | |
| import org.apache.beam.sdk.schemas.JavaFieldSchema; | |
| import org.apache.beam.sdk.schemas.annotations.DefaultSchema; | |
| import org.apache.beam.sdk.schemas.annotations.SchemaCreate; | |
| import com.fasterxml.jackson.annotation.JsonCreator; | |
| import com.fasterxml.jackson.annotation.JsonProperty; | |
| import lombok.EqualsAndHashCode; | |
| // アメダス気象情報 | |
| @DefaultSchema(JavaFieldSchema.class) // BeamのSchemaとする | |
| @EqualsAndHashCode // イベント同士の妥当な一致比較を提供 | |
| public class Weather { | |
| // 観測日時 (RFC-3339 形式) | |
| public final String timestamp; | |
| // 気温 [℃] | |
| public final float temperatureC; | |
| // 降水量 [mm] | |
| public final float rainfallMm; | |
| @JsonCreator // JacksonでJSONパース | |
| @SchemaCreate // BeamのSchemaのコンストラクタ | |
| public Weather( | |
| @JsonProperty("timestamp") String timestamp, | |
| @JsonProperty("temperature [°C]") float temperatureC, | |
| @JsonProperty("rainfall [mm]") float rainfallMm) { | |
| this.timestamp = timestamp; | |
| this.temperatureC = temperatureC; | |
| this.rainfallMm = rainfallMm; | |
| } | |
| public String toLine() { | |
| return this.timestamp + "\ttemperature:" + this.temperatureC + "\trainfall:" + this.rainfallMm; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.apache.beam.sp_handson.sec03.p01; | |
| import java.util.Optional; | |
| import org.apache.beam.sdk.io.kafka.KafkaRecord; | |
| import org.apache.beam.sdk.io.kafka.TimestampPolicy; | |
| import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory; | |
| import org.apache.beam.sp_handson.sec02.p02.Weather; | |
| import org.apache.kafka.common.TopicPartition; | |
| import org.joda.time.Instant; | |
| import com.fasterxml.jackson.core.JsonProcessingException; | |
| import com.fasterxml.jackson.databind.ObjectMapper; | |
| // KafkaIO.Read で使う、Event timeの割当ポリシー(のファクトリ)。 | |
| // 値がJSON文字列であること、Weatherクラスにマッピングできることを前提とし動作。 | |
| // | |
| // KafkaレコードをいちいちJSONパースするのでパフォーマンス上は良い実装とは言えない。 | |
| public class WeatherTimestampPolicyFactory<K> implements TimestampPolicyFactory<K, String> { | |
| private static final ObjectMapper objectMapper = new ObjectMapper(); | |
| @Override | |
| public TimestampPolicy<K, String> createTimestampPolicy( | |
| TopicPartition tp, Optional<Instant> previousWatermark) { | |
| return new TimestampPolicy<K, String>() { | |
| Instant lastTimestamp = Instant.EPOCH; | |
| @Override | |
| public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, String> rec) { | |
| String jsonWeather = rec.getKV().getValue(); | |
| Weather weather; | |
| try { | |
| weather = objectMapper.readValue(jsonWeather, Weather.class); | |
| } catch (JsonProcessingException e) { | |
| e.printStackTrace(); | |
| return Instant.EPOCH; | |
| } | |
| this.lastTimestamp = Instant.parse(weather.timestamp); | |
| return this.lastTimestamp; | |
| } | |
| @Override | |
| public Instant getWatermark(PartitionContext ctx) { | |
| Instant prevWatermark = previousWatermark.orElse(Instant.EPOCH); | |
| // ウォーターマークは単調増加になるようにする | |
| if (this.lastTimestamp.getMillis() > prevWatermark.getMillis()) { | |
| return this.lastTimestamp; | |
| } else { | |
| return prevWatermark; | |
| } | |
| } | |
| }; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment