Skip to content

Instantly share code, notes, and snippets.

@stephenc
Created February 7, 2020 11:31
Show Gist options
  • Save stephenc/0bbc08391ddce5a781242900e4b33a5d to your computer and use it in GitHub Desktop.
Save stephenc/0bbc08391ddce5a781242900e4b33a5d to your computer and use it in GitHub Desktop.
11:14:58.488 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 585 @ 1581074098488 for job b8c8f1c473708fdb1a1c20c059f94a16.
11:14:58.496 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 585 for job b8c8f1c473708fdb1a1c20c059f94a16 (656 bytes in 7 ms).
11:15:01.039 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers via failover strategy: full graph restart
11:15:01.040 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master for job Test topology (b8c8f1c473708fdb1a1c20c059f94a16).
11:15:01.040 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization on master in 0 ms.
11:15:01.040 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.jobmaster.JobMaster - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'file:/tmp', asynchronous: TRUE, maxStateSize: 5242880)
11:15:01.041 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 586 @ 1581074101041 for job b8c8f1c473708fdb1a1c20c059f94a16.
11:15:01.051 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 586 for job b8c8f1c473708fdb1a1c20c059f94a16 (656 bytes in 10 ms).
11:15:01.053 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.jobmaster.JobMaster - Attempting to load configured state backend for savepoint disposal
11:15:01.053 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.jobmaster.JobMaster - No state backend configured, attempting to dispose savepoint with default backend (file system based)
11:15:01.056 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job b8c8f1c473708fdb1a1c20c059f94a16 from savepoint file:/tmp/savepoint-b8c8f1-008d657fefe6 ()
11:15:01.056 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job b8c8f1c473708fdb1a1c20c059f94a16 to 587.
11:15:01.056 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job b8c8f1c473708fdb1a1c20c059f94a16 from latest valid checkpoint: Checkpoint 586 @ 0 for b8c8f1c473708fdb1a1c20c059f94a16.
11:15:01.057 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore
11:15:01.057 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Test topology (b8c8f1c473708fdb1a1c20c059f94a16) switched from state RUNNING to SUSPENDED.
org.apache.flink.util.FlinkException: Job is being rescaled.
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
11:15:01.058 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Checkpointing parallel source -> Double value -> Sink: Print to Std. Out (1/1) (28d033900d21f5d3786507c782fd6934) switched from RUNNING to CANCELING.
11:15:01.059 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Checkpointing parallel source -> Double value -> Sink: Print to Std. Out (1/1) (28d033900d21f5d3786507c782fd6934) switched from CANCELING to CANCELED.
11:15:01.061 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Test topology (b8c8f1c473708fdb1a1c20c059f94a16) switched from state CREATED to FAILING.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$8(JobMaster.java:504)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job b8c8f1c473708fdb1a1c20c059f94a16.
... 22 common frames omitted
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Suspend needs to happen atomically
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 19 common frames omitted
Caused by: java.lang.IllegalStateException: Suspend needs to happen atomically
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
at org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
... 20 common frames omitted
11:15:01.071 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Checkpointing parallel source -> Double value -> Sink: Print to Std. Out (1/1) (6e1aab310eaa02ff4982ea0184001319) switched from CREATED to CANCELING.
11:15:01.071 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Checkpointing parallel source -> Double value -> Sink: Print to Std. Out (1/1) (6e1aab310eaa02ff4982ea0184001319) switched from CANCELING to CANCELED.
11:15:01.072 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job Test topology (b8c8f1c473708fdb1a1c20c059f94a16) if no longer possible.
11:15:01.072 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Test topology (b8c8f1c473708fdb1a1c20c059f94a16) switched from state FAILING to FAILED.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$8(JobMaster.java:504)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job b8c8f1c473708fdb1a1c20c059f94a16.
... 22 common frames omitted
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Suspend needs to happen atomically
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 19 common frames omitted
Caused by: java.lang.IllegalStateException: Suspend needs to happen atomically
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
at org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
... 20 common frames omitted
11:15:01.074 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Could not restart the job Test topology (b8c8f1c473708fdb1a1c20c059f94a16) because a type of SuppressRestartsException was thrown.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$8(JobMaster.java:504)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job b8c8f1c473708fdb1a1c20c059f94a16.
... 22 common frames omitted
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Suspend needs to happen atomically
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 19 common frames omitted
Caused by: java.lang.IllegalStateException: Suspend needs to happen atomically
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
at org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
... 20 common frames omitted
11:15:01.074 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job b8c8f1c473708fdb1a1c20c059f94a16.
11:15:01.074 [flink-akka.actor.default-dispatcher-31] INFO o.a.f.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down
11:15:01.074 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint - Checkpoint with ID 586 at 'file:/tmp/savepoint-b8c8f1-008d657fefe6' not discarded.
11:15:06.527 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers via failover strategy: full graph restart
11:15:06.527 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master for job Test topology (b8c8f1c473708fdb1a1c20c059f94a16).
11:15:06.527 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization on master in 0 ms.
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'totalNumberOfCheckpoints'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'numberOfInProgressCheckpoints'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'numberOfCompletedCheckpoints'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'numberOfFailedCheckpoints'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointSize'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointDuration'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointExternalPath'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.531 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.jobmaster.JobMaster - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'file:/tmp', asynchronous: TRUE, maxStateSize: 5242880)
11:15:06.531 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'restartingTime'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.531 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'downtime'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.531 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'uptime'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.531 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'fullRestarts'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology]
11:15:06.531 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Checkpointing parallel source -> Double value -> Sink: Print to Std. Out (1/1) of job b8c8f1c473708fdb1a1c20c059f94a16 is not in state RUNNING but CANCELED instead. Aborting checkpoint.
11:15:06.532 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job b8c8f1c473708fdb1a1c20c059f94a16 from savepoint file:/tmp/savepoint-b8c8f1-008d657fefe6 ()
11:15:06.532 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job b8c8f1c473708fdb1a1c20c059f94a16 to 587.
11:15:06.532 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job b8c8f1c473708fdb1a1c20c059f94a16 from latest valid checkpoint: Checkpoint 586 @ 0 for b8c8f1c473708fdb1a1c20c059f94a16.
11:15:06.533 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore
11:15:47.200 [flink-akka.actor.default-dispatcher-21] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing idle slot [aafb18441a1c233c209a2d2b3c3821ca].
11:16:37.152 [flink-akka.actor.default-dispatcher-38] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing idle slot [f9109d62cd3331f2ea4a341259769fed].
package myflink;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
public class StreamingJob {
public static void main(String[] args) throws Exception {
ParameterTool config = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(config);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final boolean watermarking = !("enabled".equalsIgnoreCase(config.get("watermarking", "enabled")));
SingleOutputStreamOperator<Long> stream;
switch (config.get("source", "default").toLowerCase(Locale.ENGLISH)) {
default:
stream = env.addSource(new MySource(watermarking))
.uid("1").name("Checkpointing source");
break;
case "parallel":
stream = env.addSource(new MyParallelSource(watermarking))
.uid("2").name("Checkpointing parallel source");
break;
case "iterator":
stream = env.fromCollection(new MyIterator(), Long.class).uid("3").name("Iterator source");
break;
}
if (!"none".equalsIgnoreCase(config.get("map", "double"))) {
stream = stream.map(new DoubleMapFunction()).uid("4").name("Double value");
}
stream.print();
switch (config.get("checkpointing", "exactly-once").toLowerCase(Locale.ENGLISH)) {
case "exactly-once":
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
break;
case "at-least-once":
env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
break;
default:
break;
}
env.execute("Test topology");
}
public static class DoubleMapFunction implements MapFunction<Long, Long> {
private static final long serialVersionUID = 1L;
@Override
public Long map(Long v) {
return v == null ? null : v * v;
}
}
public static class ParallelState implements Serializable {
private static final long serialVersionUID = 1L;
public final long count;
public final int buckets;
public final int index;
public ParallelState(long count, int buckets, int index) {
this.count = count;
this.buckets = buckets;
this.index = index;
}
}
public static class MyParallelSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
private final boolean watermarking;
private long count = 0L;
private List<ParallelState> recovery = null;
private volatile boolean isRunning = true;
private transient ListState<ParallelState> checkpointedCount;
public MyParallelSource(boolean watermarking) {
this.watermarking = watermarking;
}
public void initializeState(FunctionInitializationContext context) throws Exception {
this.checkpointedCount = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("count", ParallelState.class));
if (context.isRestored()) {
count = 0;
recovery = new ArrayList<>();
for (ParallelState state : this.checkpointedCount.get()) {
this.count = Math.max(this.count, state.count);
recovery.add(state);
}
if (recovery.size() <= 1) {
recovery = null;
}
}
}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
this.checkpointedCount.clear();
final RuntimeContext ctx = getRuntimeContext();
if (recovery != null) {
this.checkpointedCount.addAll(recovery);
} else {
this.checkpointedCount.add(
new ParallelState(
count,
ctx.getNumberOfParallelSubtasks(),
ctx.getIndexOfThisSubtask()
)
);
}
}
@Override
public void run(SourceContext<Long> ctx) throws Exception {
Random entropy = new Random();
long lastWatermark = 0;
final RuntimeContext context = getRuntimeContext();
while (isRunning) {
// this synchronized block ensures that state checkpointing,
// internal state updates and emission of elements are an atomic operation
synchronized (ctx.getCheckpointLock()) {
long now = System.currentTimeMillis();
if (recovery != null && !recovery.isEmpty()) {
final ParallelState s = recovery.remove(0);
ctx.collectWithTimestamp(s.count, now);
// short-cut when restoring state
if (s.count + s.buckets < count) {
recovery.add(new ParallelState(s.count + s.buckets, s.buckets, s.index));
}
}
if (recovery != null && recovery.isEmpty()) {
recovery = null;
}
if (recovery == null) {
if (count % context.getNumberOfParallelSubtasks() == context.getIndexOfThisSubtask()) {
if (watermarking) {
ctx.collectWithTimestamp(count, now);
if (now - lastWatermark > 1000L) {
ctx.emitWatermark(new Watermark(lastWatermark));
}
}
ctx.collect(count);
}
count++;
}
}
ctx.markAsTemporarilyIdle();
try {
Thread.sleep(200 + entropy.nextInt(100));
} catch (InterruptedException e) {
// ignore
}
}
}
@Override
public void cancel() {
isRunning = false;
}
}
public static class MySource implements SourceFunction<Long>, CheckpointedFunction {
private static final long serialVersionUID = 1L;
private final boolean watermarking;
private long count = 0L;
private volatile boolean isRunning = true;
private transient ListState<Long> checkpointedCount;
public MySource(boolean watermarking) {
this.watermarking = watermarking;
}
public void run(SourceContext<Long> ctx) {
Random entropy = new Random();
long lastWatermark = 0;
while (isRunning) {
// this synchronized block ensures that state checkpointing,
// internal state updates and emission of elements are an atomic operation
synchronized (ctx.getCheckpointLock()) {
if (watermarking) {
long now = System.currentTimeMillis();
ctx.collectWithTimestamp(count, now);
if (now - lastWatermark > 1000L) {
ctx.emitWatermark(new Watermark(lastWatermark));
}
}
ctx.collect(count);
count++;
}
ctx.markAsTemporarilyIdle();
try {
Thread.sleep(200 + entropy.nextInt(100));
} catch (InterruptedException e) {
// ignore
}
}
}
public void cancel() {
isRunning = false;
}
public void initializeState(FunctionInitializationContext context) throws Exception {
this.checkpointedCount = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("count", Long.class));
if (context.isRestored()) {
for (Long count : this.checkpointedCount.get()) {
this.count = count;
}
}
}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
this.checkpointedCount.clear();
this.checkpointedCount.add(count);
}
}
private static class MyIterator implements Iterator<Long>, Serializable {
private static final long serialVersionUID = 1L;
private long value;
@Override
public boolean hasNext() {
return true;
}
@Override
public Long next() {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
// ignore
}
return value++;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment