Skip to content

Instantly share code, notes, and snippets.

@laysakura
Created December 3, 2022 00:56
Show Gist options
  • Select an option

  • Save laysakura/175cf1137c8cb3356236ed7ab2443b71 to your computer and use it in GitHub Desktop.

Select an option

Save laysakura/175cf1137c8cb3356236ed7ab2443b71 to your computer and use it in GitHub Desktop.
Apache Beam feature request: `IntervalWindow`'s [`start`, `end`) for merged windows (e.g. Sessions)
{"timestamp": "2022-08-01T00:00:00.000+00:00", "temperature [°C]": 0.0, "rainfall [mm]": 0.0}
{"timestamp": "2022-08-01T01:00:00.000+00:00", "temperature [°C]": 0.0, "rainfall [mm]": 0.0}
{"timestamp": "2022-08-01T02:00:00.000+00:00", "temperature [°C]": 0.0, "rainfall [mm]": 0.0}
package org.apache.beam.sp_handson.sec03.p03;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sp_handson.sec02.p02.Weather;
import org.apache.beam.sp_handson.sec03.p01.WeatherTimestampPolicyFactory;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
// 30℃以上の気温が継続した日数をカウントするパイプライン
public class HeatWavePipeline {
// アメダス観測のJSON文字列をWeatherクラスにマッピングするため
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
// Kafkaソース入力
PCollection<KV<Long, String>> kafkaInput = p.apply(
KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
// 東京用のトピックを使用
.withTopic("weather-tokyo")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withTimestampPolicyFactory(new WeatherTimestampPolicyFactory<>())
.withReadCommitted()
.withoutMetadata());
// KV<Long, String> のバリュー部分をJSONとしてパースし、Weatherクラスにマッピング
PCollection<Weather> weather = kafkaInput.apply(
ParDo.of(new DoFn<KV<Long, String>, Weather>() {
@ProcessElement
public void processElement(@Element KV<Long, String> rawWeather,
OutputReceiver<Weather> out)
throws JsonProcessingException {
String jsonWeather = rawWeather.getValue();
Weather weather = objectMapper.readValue(jsonWeather, Weather.class);
out.output(weather);
}
}));
// 気象情報から気温だけを抽出
PCollection<Float> temperature = weather.apply(
MapElements.into(TypeDescriptors.floats())
.via(w -> w.temperatureC));
// 30℃以上の気温だけをフィルタリング
PCollection<Float> temperatureOver30 = temperature.apply(
ParDo.of(new DoFn<Float, Float>() {
@ProcessElement
public void processElement(
@Element Float t,
OutputReceiver<Float> out) {
if (t >= 30.0) {
out.output(t);
}
}
}));
// Event time-basedなセッションウィンドウ(セッション持続時間 = 1日)を構築
PCollection<Float> windowedTemperature = temperatureOver30.apply(
Window.<Float>into(
Sessions.withGapDuration(Duration.standardDays(1))));
// ウィンドウ情報からウィンドウ開始点を取得
PCollection<KV<Instant, Float>> windowedTemperatureWithDate = windowedTemperature.apply(
ParDo.of(new DoFn<Float, KV<Instant, Float>>() {
@ProcessElement
public void processElement(
@Element Float temperature,
IntervalWindow window,
OutputReceiver<KV<Instant, Float>> out) {
Instant winEnd = window.maxTimestamp();
System.out.println(window);
out.output(KV.of(winEnd, temperature));
}
}));
// ウィンドウ開始点毎(セッションウィンドウ毎)に、
// 各ウィンドウでのイベントの個数(30℃以上が継続した日数)をカウント
PCollection<KV<Instant, Long>> eventCounts = windowedTemperatureWithDate.apply(
Count.<Instant, Float>perKey());
// フォーマットして文字列化
PCollection<String> meanTemperatureLine = eventCounts.apply(
MapElements
.into(TypeDescriptors.strings())
.via(cnt -> "leftmost datetime:"
// 日本時間での日時
+ cnt.getKey().toDateTime(DateTimeZone.forID("+09:00")).toString()
+ "\tcount:"
+ cnt));
// Kafkaシンク出力
meanTemperatureLine.apply(
KafkaIO.<Void, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("beam-out")
.withValueSerializer(StringSerializer.class)
.values());
p.run();
}
}
package org.apache.beam.sp_handson.sec02.p02;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.EqualsAndHashCode;
// アメダス気象情報
@DefaultSchema(JavaFieldSchema.class) // BeamのSchemaとする
@EqualsAndHashCode // イベント同士の妥当な一致比較を提供
public class Weather {
// 観測日時 (RFC-3339 形式)
public final String timestamp;
// 気温 [℃]
public final float temperatureC;
// 降水量 [mm]
public final float rainfallMm;
@JsonCreator // JacksonでJSONパース
@SchemaCreate // BeamのSchemaのコンストラクタ
public Weather(
@JsonProperty("timestamp") String timestamp,
@JsonProperty("temperature [°C]") float temperatureC,
@JsonProperty("rainfall [mm]") float rainfallMm) {
this.timestamp = timestamp;
this.temperatureC = temperatureC;
this.rainfallMm = rainfallMm;
}
public String toLine() {
return this.timestamp + "\ttemperature:" + this.temperatureC + "\trainfall:" + this.rainfallMm;
}
}
package org.apache.beam.sp_handson.sec03.p01;
import java.util.Optional;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sp_handson.sec02.p02.Weather;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.Instant;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
// KafkaIO.Read で使う、Event timeの割当ポリシー(のファクトリ)。
// 値がJSON文字列であること、Weatherクラスにマッピングできることを前提とし動作。
//
// KafkaレコードをいちいちJSONパースするのでパフォーマンス上は良い実装とは言えない。
public class WeatherTimestampPolicyFactory<K> implements TimestampPolicyFactory<K, String> {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public TimestampPolicy<K, String> createTimestampPolicy(
TopicPartition tp, Optional<Instant> previousWatermark) {
return new TimestampPolicy<K, String>() {
Instant lastTimestamp = Instant.EPOCH;
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, String> rec) {
String jsonWeather = rec.getKV().getValue();
Weather weather;
try {
weather = objectMapper.readValue(jsonWeather, Weather.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
return Instant.EPOCH;
}
this.lastTimestamp = Instant.parse(weather.timestamp);
return this.lastTimestamp;
}
@Override
public Instant getWatermark(PartitionContext ctx) {
Instant prevWatermark = previousWatermark.orElse(Instant.EPOCH);
// ウォーターマークは単調増加になるようにする
if (this.lastTimestamp.getMillis() > prevWatermark.getMillis()) {
return this.lastTimestamp;
} else {
return prevWatermark;
}
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment