Created
February 2, 2024 19:18
-
-
Save damondouglas/d4606679ceab30446b0225716ac2b8fd to your computer and use it in GitHub Desktop.
State not compatible with Splittable DoFns.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example; | |
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; | |
import java.util.List; | |
import java.util.stream.Collectors; | |
import java.util.stream.Stream; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.io.range.OffsetRange; | |
import org.apache.beam.sdk.state.StateSpec; | |
import org.apache.beam.sdk.state.StateSpecs; | |
import org.apache.beam.sdk.state.ValueState; | |
import org.apache.beam.sdk.transforms.*; | |
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; | |
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; | |
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; | |
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; | |
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators; | |
import org.apache.beam.sdk.transforms.windowing.BoundedWindow; | |
import org.apache.beam.sdk.values.KV; | |
import org.joda.time.Duration; | |
import org.joda.time.Instant; | |
class StateNotCompatibleWithSDF<T> extends DoFn<KV<Integer, List<T>>, KV<Integer, T>> { | |
private final Duration interval; | |
private static final String STATE_SPEC_IC = "some-state"; | |
@StateId(STATE_SPEC_IC) | |
private final StateSpec<ValueState<Long>> someStateSpec = StateSpecs.value(); | |
private StateNotCompatibleWithSDF(Duration interval) { | |
this.interval = interval; | |
} | |
@GetInitialRestriction | |
public OffsetRange getInitialRange(@Element KV<Integer, List<T>> element) { | |
return new OffsetRange(-1, checkStateNotNull(element.getValue()).size()); | |
} | |
@NewTracker | |
public RestrictionTracker<OffsetRange, Long> newTracker(@Restriction OffsetRange restriction) { | |
return new OffsetRangeTracker(restriction); | |
} | |
@GetInitialWatermarkEstimatorState | |
public Instant getInitialWatermarkState() { | |
return BoundedWindow.TIMESTAMP_MIN_VALUE; | |
} | |
@NewWatermarkEstimator | |
public WatermarkEstimator<Instant> newWatermarkEstimator(@WatermarkEstimatorState Instant state) { | |
return new WatermarkEstimators.Manual(state); | |
} | |
@TruncateRestriction | |
public RestrictionTracker.TruncateResult<OffsetRange> truncate() { | |
// stop emitting immediately upon drain | |
return null; | |
} | |
@ProcessElement | |
public ProcessContinuation process( | |
@StateId(STATE_SPEC_IC) ValueState<Long> valueState, | |
@Element KV<Integer, List<T>> element, | |
ManualWatermarkEstimator<Instant> estimator, | |
RestrictionTracker<OffsetRange, Long> restrictionTracker, | |
OutputReceiver<KV<Integer, T>> receiver) { | |
if (element.getValue() == null || element.getValue().isEmpty()) { | |
return ProcessContinuation.stop(); | |
} | |
long position = checkStateNotNull(restrictionTracker.currentRestriction()).getFrom(); | |
if (position < 0) { | |
position = 0; | |
} | |
Instant ts = Instant.now(); | |
if (restrictionTracker.tryClaim(position)) { | |
int index = (int) position; | |
estimator.setWatermark(ts); | |
receiver.outputWithTimestamp(KV.of(element.getKey(), element.getValue().get(index)), ts); | |
return ProcessContinuation.resume().withResumeDelay(interval); | |
} | |
return ProcessContinuation.stop(); | |
} | |
public static void main(String[] args) { | |
Pipeline p = Pipeline.create(); | |
List<Integer> items = Stream.iterate(0, i -> i + 1).limit(10).collect(Collectors.toList()); | |
p.apply(Create.of(KV.of(0, items))) | |
.apply(ParDo.of(new StateNotCompatibleWithSDF<>(Duration.standardSeconds(1L)))) | |
.apply( | |
ParDo.of( | |
new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() { | |
@ProcessElement | |
public void process(@Element KV<Integer, Integer> element) { | |
System.out.printf("%s: %s\n", Instant.now(), element); | |
} | |
})); | |
p.run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment