Created
February 7, 2020 11:31
-
-
Save stephenc/0bbc08391ddce5a781242900e4b33a5d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
11:14:58.488 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 585 @ 1581074098488 for job b8c8f1c473708fdb1a1c20c059f94a16. | |
11:14:58.496 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 585 for job b8c8f1c473708fdb1a1c20c059f94a16 (656 bytes in 7 ms). | |
11:15:01.039 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers via failover strategy: full graph restart | |
11:15:01.040 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master for job Test topology (b8c8f1c473708fdb1a1c20c059f94a16). | |
11:15:01.040 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization on master in 0 ms. | |
11:15:01.040 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.jobmaster.JobMaster - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'file:/tmp', asynchronous: TRUE, maxStateSize: 5242880) | |
11:15:01.041 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 586 @ 1581074101041 for job b8c8f1c473708fdb1a1c20c059f94a16. | |
11:15:01.051 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 586 for job b8c8f1c473708fdb1a1c20c059f94a16 (656 bytes in 10 ms). | |
11:15:01.053 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.jobmaster.JobMaster - Attempting to load configured state backend for savepoint disposal | |
11:15:01.053 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.jobmaster.JobMaster - No state backend configured, attempting to dispose savepoint with default backend (file system based) | |
11:15:01.056 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job b8c8f1c473708fdb1a1c20c059f94a16 from savepoint file:/tmp/savepoint-b8c8f1-008d657fefe6 () | |
11:15:01.056 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job b8c8f1c473708fdb1a1c20c059f94a16 to 587. | |
11:15:01.056 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job b8c8f1c473708fdb1a1c20c059f94a16 from latest valid checkpoint: Checkpoint 586 @ 0 for b8c8f1c473708fdb1a1c20c059f94a16. | |
11:15:01.057 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore | |
11:15:01.057 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Test topology (b8c8f1c473708fdb1a1c20c059f94a16) switched from state RUNNING to SUSPENDED. | |
org.apache.flink.util.FlinkException: Job is being rescaled. | |
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465) | |
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) | |
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) | |
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) | |
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) | |
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) | |
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) | |
at akka.actor.Actor.aroundReceive(Actor.scala:502) | |
at akka.actor.Actor.aroundReceive$(Actor.scala:500) | |
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) | |
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) | |
at akka.actor.ActorCell.invoke(ActorCell.scala:495) | |
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) | |
at akka.dispatch.Mailbox.run(Mailbox.scala:224) | |
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) | |
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) | |
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) | |
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) | |
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) | |
11:15:01.058 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Checkpointing parallel source -> Double value -> Sink: Print to Std. Out (1/1) (28d033900d21f5d3786507c782fd6934) switched from RUNNING to CANCELING. | |
11:15:01.059 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Checkpointing parallel source -> Double value -> Sink: Print to Std. Out (1/1) (28d033900d21f5d3786507c782fd6934) switched from CANCELING to CANCELED. | |
11:15:01.061 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Test topology (b8c8f1c473708fdb1a1c20c059f94a16) switched from state CREATED to FAILING. | |
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause. | |
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$8(JobMaster.java:504) | |
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) | |
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) | |
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) | |
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) | |
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) | |
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) | |
at akka.actor.Actor.aroundReceive(Actor.scala:502) | |
at akka.actor.Actor.aroundReceive$(Actor.scala:500) | |
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) | |
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) | |
at akka.actor.ActorCell.invoke(ActorCell.scala:495) | |
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) | |
at akka.dispatch.Mailbox.run(Mailbox.scala:224) | |
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) | |
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) | |
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) | |
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) | |
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) | |
Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job b8c8f1c473708fdb1a1c20c059f94a16. | |
... 22 common frames omitted | |
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Suspend needs to happen atomically | |
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) | |
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) | |
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975) | |
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) | |
... 19 common frames omitted | |
Caused by: java.lang.IllegalStateException: Suspend needs to happen atomically | |
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) | |
at org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172) | |
at org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221) | |
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465) | |
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) | |
... 20 common frames omitted | |
11:15:01.071 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Checkpointing parallel source -> Double value -> Sink: Print to Std. Out (1/1) (6e1aab310eaa02ff4982ea0184001319) switched from CREATED to CANCELING. | |
11:15:01.071 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Checkpointing parallel source -> Double value -> Sink: Print to Std. Out (1/1) (6e1aab310eaa02ff4982ea0184001319) switched from CANCELING to CANCELED. | |
11:15:01.072 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job Test topology (b8c8f1c473708fdb1a1c20c059f94a16) if no longer possible. | |
11:15:01.072 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Test topology (b8c8f1c473708fdb1a1c20c059f94a16) switched from state FAILING to FAILED. | |
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause. | |
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$8(JobMaster.java:504) | |
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) | |
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) | |
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) | |
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) | |
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) | |
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) | |
at akka.actor.Actor.aroundReceive(Actor.scala:502) | |
at akka.actor.Actor.aroundReceive$(Actor.scala:500) | |
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) | |
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) | |
at akka.actor.ActorCell.invoke(ActorCell.scala:495) | |
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) | |
at akka.dispatch.Mailbox.run(Mailbox.scala:224) | |
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) | |
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) | |
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) | |
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) | |
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) | |
Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job b8c8f1c473708fdb1a1c20c059f94a16. | |
... 22 common frames omitted | |
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Suspend needs to happen atomically | |
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) | |
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) | |
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975) | |
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) | |
... 19 common frames omitted | |
Caused by: java.lang.IllegalStateException: Suspend needs to happen atomically | |
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) | |
at org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172) | |
at org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221) | |
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465) | |
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) | |
... 20 common frames omitted | |
11:15:01.074 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Could not restart the job Test topology (b8c8f1c473708fdb1a1c20c059f94a16) because a type of SuppressRestartsException was thrown. | |
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause. | |
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$8(JobMaster.java:504) | |
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) | |
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) | |
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) | |
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) | |
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) | |
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) | |
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) | |
at akka.actor.Actor.aroundReceive(Actor.scala:502) | |
at akka.actor.Actor.aroundReceive$(Actor.scala:500) | |
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) | |
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) | |
at akka.actor.ActorCell.invoke(ActorCell.scala:495) | |
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) | |
at akka.dispatch.Mailbox.run(Mailbox.scala:224) | |
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) | |
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) | |
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) | |
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) | |
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) | |
Caused by: org.apache.flink.util.FlinkException: Failed to rescale the job b8c8f1c473708fdb1a1c20c059f94a16. | |
... 22 common frames omitted | |
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Suspend needs to happen atomically | |
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) | |
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) | |
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975) | |
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) | |
... 19 common frames omitted | |
Caused by: java.lang.IllegalStateException: Suspend needs to happen atomically | |
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) | |
at org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172) | |
at org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221) | |
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465) | |
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) | |
... 20 common frames omitted | |
11:15:01.074 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job b8c8f1c473708fdb1a1c20c059f94a16. | |
11:15:01.074 [flink-akka.actor.default-dispatcher-31] INFO o.a.f.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down | |
11:15:01.074 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.checkpoint.CompletedCheckpoint - Checkpoint with ID 586 at 'file:/tmp/savepoint-b8c8f1-008d657fefe6' not discarded. | |
11:15:06.527 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers via failover strategy: full graph restart | |
11:15:06.527 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master for job Test topology (b8c8f1c473708fdb1a1c20c059f94a16). | |
11:15:06.527 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization on master in 0 ms. | |
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'totalNumberOfCheckpoints'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'numberOfInProgressCheckpoints'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'numberOfCompletedCheckpoints'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'numberOfFailedCheckpoints'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointSize'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointDuration'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointAlignmentBuffered'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.530 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'lastCheckpointExternalPath'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.531 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.jobmaster.JobMaster - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'file:/tmp', asynchronous: TRUE, maxStateSize: 5242880) | |
11:15:06.531 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'restartingTime'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.531 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'downtime'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.531 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'uptime'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.531 [flink-akka.actor.default-dispatcher-31] WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'fullRestarts'. Metric will not be reported.[flink-jobmanager, jobmanager, Test topology] | |
11:15:06.531 [flink-akka.actor.default-dispatcher-31] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Checkpointing parallel source -> Double value -> Sink: Print to Std. Out (1/1) of job b8c8f1c473708fdb1a1c20c059f94a16 is not in state RUNNING but CANCELED instead. Aborting checkpoint. | |
11:15:06.532 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job b8c8f1c473708fdb1a1c20c059f94a16 from savepoint file:/tmp/savepoint-b8c8f1-008d657fefe6 () | |
11:15:06.532 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job b8c8f1c473708fdb1a1c20c059f94a16 to 587. | |
11:15:06.532 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job b8c8f1c473708fdb1a1c20c059f94a16 from latest valid checkpoint: Checkpoint 586 @ 0 for b8c8f1c473708fdb1a1c20c059f94a16. | |
11:15:06.533 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore | |
11:15:47.200 [flink-akka.actor.default-dispatcher-21] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing idle slot [aafb18441a1c233c209a2d2b3c3821ca]. | |
11:16:37.152 [flink-akka.actor.default-dispatcher-38] INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing idle slot [f9109d62cd3331f2ea4a341259769fed]. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package myflink; | |
import java.io.Serializable; | |
import java.util.ArrayList; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Locale; | |
import java.util.Random; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.common.functions.RuntimeContext; | |
import org.apache.flink.api.common.state.ListState; | |
import org.apache.flink.api.common.state.ListStateDescriptor; | |
import org.apache.flink.api.java.utils.ParameterTool; | |
import org.apache.flink.runtime.state.FunctionInitializationContext; | |
import org.apache.flink.runtime.state.FunctionSnapshotContext; | |
import org.apache.flink.streaming.api.CheckpointingMode; | |
import org.apache.flink.streaming.api.TimeCharacteristic; | |
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; | |
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; | |
import org.apache.flink.streaming.api.functions.source.SourceFunction; | |
import org.apache.flink.streaming.api.watermark.Watermark; | |
public class StreamingJob { | |
public static void main(String[] args) throws Exception { | |
ParameterTool config = ParameterTool.fromArgs(args); | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.getConfig().setGlobalJobParameters(config); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
final boolean watermarking = !("enabled".equalsIgnoreCase(config.get("watermarking", "enabled"))); | |
SingleOutputStreamOperator<Long> stream; | |
switch (config.get("source", "default").toLowerCase(Locale.ENGLISH)) { | |
default: | |
stream = env.addSource(new MySource(watermarking)) | |
.uid("1").name("Checkpointing source"); | |
break; | |
case "parallel": | |
stream = env.addSource(new MyParallelSource(watermarking)) | |
.uid("2").name("Checkpointing parallel source"); | |
break; | |
case "iterator": | |
stream = env.fromCollection(new MyIterator(), Long.class).uid("3").name("Iterator source"); | |
break; | |
} | |
if (!"none".equalsIgnoreCase(config.get("map", "double"))) { | |
stream = stream.map(new DoubleMapFunction()).uid("4").name("Double value"); | |
} | |
stream.print(); | |
switch (config.get("checkpointing", "exactly-once").toLowerCase(Locale.ENGLISH)) { | |
case "exactly-once": | |
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); | |
break; | |
case "at-least-once": | |
env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE); | |
break; | |
default: | |
break; | |
} | |
env.execute("Test topology"); | |
} | |
public static class DoubleMapFunction implements MapFunction<Long, Long> { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public Long map(Long v) { | |
return v == null ? null : v * v; | |
} | |
} | |
public static class ParallelState implements Serializable { | |
private static final long serialVersionUID = 1L; | |
public final long count; | |
public final int buckets; | |
public final int index; | |
public ParallelState(long count, int buckets, int index) { | |
this.count = count; | |
this.buckets = buckets; | |
this.index = index; | |
} | |
} | |
public static class MyParallelSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction { | |
private static final long serialVersionUID = 1L; | |
private final boolean watermarking; | |
private long count = 0L; | |
private List<ParallelState> recovery = null; | |
private volatile boolean isRunning = true; | |
private transient ListState<ParallelState> checkpointedCount; | |
public MyParallelSource(boolean watermarking) { | |
this.watermarking = watermarking; | |
} | |
public void initializeState(FunctionInitializationContext context) throws Exception { | |
this.checkpointedCount = context | |
.getOperatorStateStore() | |
.getListState(new ListStateDescriptor<>("count", ParallelState.class)); | |
if (context.isRestored()) { | |
count = 0; | |
recovery = new ArrayList<>(); | |
for (ParallelState state : this.checkpointedCount.get()) { | |
this.count = Math.max(this.count, state.count); | |
recovery.add(state); | |
} | |
if (recovery.size() <= 1) { | |
recovery = null; | |
} | |
} | |
} | |
public void snapshotState(FunctionSnapshotContext context) throws Exception { | |
this.checkpointedCount.clear(); | |
final RuntimeContext ctx = getRuntimeContext(); | |
if (recovery != null) { | |
this.checkpointedCount.addAll(recovery); | |
} else { | |
this.checkpointedCount.add( | |
new ParallelState( | |
count, | |
ctx.getNumberOfParallelSubtasks(), | |
ctx.getIndexOfThisSubtask() | |
) | |
); | |
} | |
} | |
@Override | |
public void run(SourceContext<Long> ctx) throws Exception { | |
Random entropy = new Random(); | |
long lastWatermark = 0; | |
final RuntimeContext context = getRuntimeContext(); | |
while (isRunning) { | |
// this synchronized block ensures that state checkpointing, | |
// internal state updates and emission of elements are an atomic operation | |
synchronized (ctx.getCheckpointLock()) { | |
long now = System.currentTimeMillis(); | |
if (recovery != null && !recovery.isEmpty()) { | |
final ParallelState s = recovery.remove(0); | |
ctx.collectWithTimestamp(s.count, now); | |
// short-cut when restoring state | |
if (s.count + s.buckets < count) { | |
recovery.add(new ParallelState(s.count + s.buckets, s.buckets, s.index)); | |
} | |
} | |
if (recovery != null && recovery.isEmpty()) { | |
recovery = null; | |
} | |
if (recovery == null) { | |
if (count % context.getNumberOfParallelSubtasks() == context.getIndexOfThisSubtask()) { | |
if (watermarking) { | |
ctx.collectWithTimestamp(count, now); | |
if (now - lastWatermark > 1000L) { | |
ctx.emitWatermark(new Watermark(lastWatermark)); | |
} | |
} | |
ctx.collect(count); | |
} | |
count++; | |
} | |
} | |
ctx.markAsTemporarilyIdle(); | |
try { | |
Thread.sleep(200 + entropy.nextInt(100)); | |
} catch (InterruptedException e) { | |
// ignore | |
} | |
} | |
} | |
@Override | |
public void cancel() { | |
isRunning = false; | |
} | |
} | |
public static class MySource implements SourceFunction<Long>, CheckpointedFunction { | |
private static final long serialVersionUID = 1L; | |
private final boolean watermarking; | |
private long count = 0L; | |
private volatile boolean isRunning = true; | |
private transient ListState<Long> checkpointedCount; | |
public MySource(boolean watermarking) { | |
this.watermarking = watermarking; | |
} | |
public void run(SourceContext<Long> ctx) { | |
Random entropy = new Random(); | |
long lastWatermark = 0; | |
while (isRunning) { | |
// this synchronized block ensures that state checkpointing, | |
// internal state updates and emission of elements are an atomic operation | |
synchronized (ctx.getCheckpointLock()) { | |
if (watermarking) { | |
long now = System.currentTimeMillis(); | |
ctx.collectWithTimestamp(count, now); | |
if (now - lastWatermark > 1000L) { | |
ctx.emitWatermark(new Watermark(lastWatermark)); | |
} | |
} | |
ctx.collect(count); | |
count++; | |
} | |
ctx.markAsTemporarilyIdle(); | |
try { | |
Thread.sleep(200 + entropy.nextInt(100)); | |
} catch (InterruptedException e) { | |
// ignore | |
} | |
} | |
} | |
public void cancel() { | |
isRunning = false; | |
} | |
public void initializeState(FunctionInitializationContext context) throws Exception { | |
this.checkpointedCount = context | |
.getOperatorStateStore() | |
.getListState(new ListStateDescriptor<>("count", Long.class)); | |
if (context.isRestored()) { | |
for (Long count : this.checkpointedCount.get()) { | |
this.count = count; | |
} | |
} | |
} | |
public void snapshotState(FunctionSnapshotContext context) throws Exception { | |
this.checkpointedCount.clear(); | |
this.checkpointedCount.add(count); | |
} | |
} | |
private static class MyIterator implements Iterator<Long>, Serializable { | |
private static final long serialVersionUID = 1L; | |
private long value; | |
@Override | |
public boolean hasNext() { | |
return true; | |
} | |
@Override | |
public Long next() { | |
try { | |
Thread.sleep(250); | |
} catch (InterruptedException e) { | |
// ignore | |
} | |
return value++; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment