This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2018 Confluent Inc. | |
* | |
* Licensed under the Confluent Community License (the "License"); you may not use | |
* this file except in compliance with the License. You may obtain a copy of the | |
* License at | |
* | |
* http://www.confluent.io/confluent-community-license | |
* | |
* Unless required by applicable law or agreed to in writing, software |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Vertx vertx = Vertx.vertx(); | |
HttpClient client = vertx.createHttpClient(); | |
HttpServer server = vertx.createHttpServer(); | |
server.requestHandler(req -> { | |
Trace trace = Trace.create(req); | |
HttpClientRequest clientReq = client.get("some-uri", ar -> { | |
trace.close(); | |
}); | |
trace.propagateTo(clientReq); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Found one Java-level deadlock: | |
============================= | |
"vert.x-eventloop-thread-3": | |
SUREFIRE-859: waiting to lock monitor 0x00007f32040062c8 (object 0x00000000e7e52ea0, a io.mewbase.server.impl.log.LogImpl), | |
which is held by "vert.x-eventloop-thread-2" | |
"vert.x-eventloop-thread-2": | |
SUREFIRE-859: waiting to lock monitor 0x00007f3204006218 (object 0x00000000ebacde70, a io.mewbase.server.impl.log.LogReadStreamImpl), | |
which is held by "vert.x-eventloop-thread-3" | |
Java stack information for the threads listed above: | |
=================================================== |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
@Repeat(times=1000) | |
public void testAsyncFileConcurrency() throws Exception { | |
String fileName = "some-file.dat"; | |
AtomicReference<AsyncFile> arFile = new AtomicReference<>(); | |
CountDownLatch latch = new CountDownLatch(1); | |
vertx.fileSystem().open(testDir + pathSep + fileName, new OpenOptions(), ar -> { | |
if (ar.succeeded()) { | |
AsyncFile af = ar.result(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java | |
index c9f8992..301bde9 100644 | |
--- a/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java | |
+++ b/src/main/java/io/vertx/core/file/impl/AsyncFileImpl.java | |
@@ -43,6 +43,7 @@ import java.util.HashSet; | |
import java.util.Objects; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
+import java.util.concurrent.atomic.AtomicLong; | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void testFoo() { | |
Vertx vertx = Vertx.vertx(); | |
HttpClient client = vertx.createHttpClient(); | |
getWithFuture(client, "url1") | |
.thenCompose(resp1 -> getWithFuture(client, "url2/" + resp1.statusMessage())) | |
.thenCompose(resp2 -> getWithFuture(client, "url3/" + resp2.statusMessage())) | |
.whenComplete(((resp3, t) -> { | |
if (t != null) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
vertx.executeBlocking(fut -> { | |
Thread worker = Thread.currentThread(); | |
AtomicBoolean complete = new AtomicBoolean(); | |
long id = vertx.setTimer(TIMEOUT, tid -> { | |
if (!complete.get()) { | |
worker.interrupt(); | |
} | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void doIt() { | |
CompletableFuture<String> cf = execute(this::findBook); | |
// Now you have a CompletableFuture you can use it like any CompletableFuture | |
cf.whenComplete((s,t)-> { | |
// CF callbacks will be always be called on the correct Vert.x thread, no need to worry about | |
// fork-join pools or anything like that | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CompletableFuture<String> cf = execute(this::findBook); | |
// Now you have a CompletableFuture you can use it like any CompletableFuture | |
cf.whenComplete((s, t) -> { | |
// CF callbacks will be always be called on the correct Vert.x thread, no need to worry about | |
// fork-join pools or anything like that | |
}); | |
// Some arbitrary blocking method which returns a String |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
AsyncResCF<String> ar = new AsyncResCF<>(); | |
vertx.executeBlocking(fut -> { | |
// Your blocking code in here | |
}, ar); | |
ar.whenComplete((s, t) -> { | |
// CF callbacks will be always be called on the correct Vert.x thread, no need to worry about | |
// fork-join pools or anything like that | |
}); |