Skip to content

Instantly share code, notes, and snippets.

@damondouglas
Created February 2, 2024 19:18
Show Gist options
  • Save damondouglas/d4606679ceab30446b0225716ac2b8fd to your computer and use it in GitHub Desktop.
Save damondouglas/d4606679ceab30446b0225716ac2b8fd to your computer and use it in GitHub Desktop.
State not compatible with Splittable DoFns.
package example;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;
class StateNotCompatibleWithSDF<T> extends DoFn<KV<Integer, List<T>>, KV<Integer, T>> {
private final Duration interval;
private static final String STATE_SPEC_IC = "some-state";
@StateId(STATE_SPEC_IC)
private final StateSpec<ValueState<Long>> someStateSpec = StateSpecs.value();
private StateNotCompatibleWithSDF(Duration interval) {
this.interval = interval;
}
@GetInitialRestriction
public OffsetRange getInitialRange(@Element KV<Integer, List<T>> element) {
return new OffsetRange(-1, checkStateNotNull(element.getValue()).size());
}
@NewTracker
public RestrictionTracker<OffsetRange, Long> newTracker(@Restriction OffsetRange restriction) {
return new OffsetRangeTracker(restriction);
}
@GetInitialWatermarkEstimatorState
public Instant getInitialWatermarkState() {
return BoundedWindow.TIMESTAMP_MIN_VALUE;
}
@NewWatermarkEstimator
public WatermarkEstimator<Instant> newWatermarkEstimator(@WatermarkEstimatorState Instant state) {
return new WatermarkEstimators.Manual(state);
}
@TruncateRestriction
public RestrictionTracker.TruncateResult<OffsetRange> truncate() {
// stop emitting immediately upon drain
return null;
}
@ProcessElement
public ProcessContinuation process(
@StateId(STATE_SPEC_IC) ValueState<Long> valueState,
@Element KV<Integer, List<T>> element,
ManualWatermarkEstimator<Instant> estimator,
RestrictionTracker<OffsetRange, Long> restrictionTracker,
OutputReceiver<KV<Integer, T>> receiver) {
if (element.getValue() == null || element.getValue().isEmpty()) {
return ProcessContinuation.stop();
}
long position = checkStateNotNull(restrictionTracker.currentRestriction()).getFrom();
if (position < 0) {
position = 0;
}
Instant ts = Instant.now();
if (restrictionTracker.tryClaim(position)) {
int index = (int) position;
estimator.setWatermark(ts);
receiver.outputWithTimestamp(KV.of(element.getKey(), element.getValue().get(index)), ts);
return ProcessContinuation.resume().withResumeDelay(interval);
}
return ProcessContinuation.stop();
}
public static void main(String[] args) {
Pipeline p = Pipeline.create();
List<Integer> items = Stream.iterate(0, i -> i + 1).limit(10).collect(Collectors.toList());
p.apply(Create.of(KV.of(0, items)))
.apply(ParDo.of(new StateNotCompatibleWithSDF<>(Duration.standardSeconds(1L))))
.apply(
ParDo.of(
new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() {
@ProcessElement
public void process(@Element KV<Integer, Integer> element) {
System.out.printf("%s: %s\n", Instant.now(), element);
}
}));
p.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment