Created
September 11, 2024 16:31
-
-
Save vietj/02ce9dc89c8a1c11dabe8828f760f973 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/vertx-web-client/src/test/java/io/vertx/ext/web/client/WebClientTest.java b/vertx-web-client/src/test/java/io/vertx/ext/web/client/WebClientTest.java | |
index 4426909..11761b6 100644 | |
--- a/vertx-web-client/src/test/java/io/vertx/ext/web/client/WebClientTest.java | |
+++ b/vertx-web-client/src/test/java/io/vertx/ext/web/client/WebClientTest.java | |
@@ -970,10 +970,10 @@ public class WebClientTest extends WebClientTestBase { | |
write(buffer, promise); | |
return promise.future(); | |
} | |
- public void write(Buffer buffer, Handler<AsyncResult<Void>> handler) { | |
+ public void write(Buffer buffer, Promise<Void> promise) { | |
received.addAndGet(buffer.length()); | |
- if (handler != null) { | |
- handler.handle(Future.succeededFuture()); | |
+ if (promise != null) { | |
+ promise.complete(); | |
} | |
} | |
public Future<Void> close() { | |
diff --git a/vertx-web-graphql/src/test/java/io/vertx/ext/web/handler/graphql/GraphQLRequest.java b/vertx-web-graphql/src/test/java/io/vertx/ext/web/handler/graphql/GraphQLRequest.java | |
index 0a6673b..0b30e4e 100644 | |
--- a/vertx-web-graphql/src/test/java/io/vertx/ext/web/handler/graphql/GraphQLRequest.java | |
+++ b/vertx-web-graphql/src/test/java/io/vertx/ext/web/handler/graphql/GraphQLRequest.java | |
@@ -129,6 +129,10 @@ public class GraphQLRequest { | |
return this; | |
} | |
+ void send(HttpClient client, Promise<JsonObject> handler) throws Exception { | |
+ send(client, 200, handler::handle); | |
+ } | |
+ | |
void send(HttpClient client, Handler<AsyncResult<JsonObject>> handler) throws Exception { | |
send(client, 200, handler); | |
} | |
diff --git a/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/ChainAuthHandlerImpl.java b/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/ChainAuthHandlerImpl.java | |
index b2fb515..5d1c16a 100644 | |
--- a/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/ChainAuthHandlerImpl.java | |
+++ b/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/ChainAuthHandlerImpl.java | |
@@ -67,7 +67,7 @@ public class ChainAuthHandlerImpl extends AuthenticationHandlerImpl<Authenticati | |
} | |
} | |
- private void iterate(final int idx, final RoutingContext ctx, User result, Throwable exception, Handler<AsyncResult<User>> handler) { | |
+ private void iterate(final int idx, final RoutingContext ctx, User result, Throwable exception, Promise<User> handler) { | |
// stop condition | |
if (idx >= handlers.size()) { | |
if (all) { | |
diff --git a/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/SessionHandlerImpl.java b/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/SessionHandlerImpl.java | |
index 75220da..61a298c 100644 | |
--- a/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/SessionHandlerImpl.java | |
+++ b/vertx-web/src/main/java/io/vertx/ext/web/handler/impl/SessionHandlerImpl.java | |
@@ -434,7 +434,7 @@ public class SessionHandlerImpl implements SessionHandler { | |
return context.succeededFuture(session); | |
}); | |
} | |
- private void doGetSession(Vertx vertx, long startTime, String sessionID, Handler<AsyncResult<Session>> resultHandler) { | |
+ private void doGetSession(Vertx vertx, long startTime, String sessionID, Promise<Session> resultHandler) { | |
sessionStore.get(sessionID) | |
.onComplete(res -> { | |
if (res.succeeded()) { | |
diff --git a/vertx-web/src/main/java/io/vertx/ext/web/handler/sockjs/impl/SockJSSession.java b/vertx-web/src/main/java/io/vertx/ext/web/handler/sockjs/impl/SockJSSession.java | |
index 306bf79..a4c2927 100644 | |
--- a/vertx-web/src/main/java/io/vertx/ext/web/handler/sockjs/impl/SockJSSession.java | |
+++ b/vertx-web/src/main/java/io/vertx/ext/web/handler/sockjs/impl/SockJSSession.java | |
@@ -73,7 +73,7 @@ class SockJSSession extends SockJSSocketBase implements Shareable { | |
private final long timeout; | |
private final Handler<SockJSSocket> sockHandler; | |
private final long heartbeatID; | |
- private final List<Handler<AsyncResult<Void>>> writeAcks = new ArrayList<>(); | |
+ private final List<Promise<Void>> writeAcks = new ArrayList<>(); | |
private TransportListener listener; | |
private boolean closed; | |
private boolean openWritten; | |
@@ -308,7 +308,7 @@ class SockJSSession extends SockJSSocketBase implements Shareable { | |
final TransportListener listener = this.listener; | |
if (listener != null) { | |
final String json; | |
- final List<Handler<AsyncResult<Void>>> acks; | |
+ final List<Promise<Void>> acks; | |
synchronized (this) { | |
if (!pendingWrites.isEmpty()) { | |
json = JsonCodec.encode(pendingWrites.toArray(new String[0])); | |
diff --git a/vertx-web/src/main/java/io/vertx/ext/web/healthchecks/impl/HealthCheckHandlerImpl.java b/vertx-web/src/main/java/io/vertx/ext/web/healthchecks/impl/HealthCheckHandlerImpl.java | |
index 3d85990..90fdd14 100644 | |
--- a/vertx-web/src/main/java/io/vertx/ext/web/healthchecks/impl/HealthCheckHandlerImpl.java | |
+++ b/vertx-web/src/main/java/io/vertx/ext/web/healthchecks/impl/HealthCheckHandlerImpl.java | |
@@ -116,7 +116,7 @@ public class HealthCheckHandlerImpl implements HealthCheckHandler { | |
if (this.resultMapper != null) { | |
Promise<CheckResult> promise = Promise.promise(); | |
promise.future().flatMap(resultMapper).onComplete(handler); | |
- return promise; | |
+ return promise::handle; | |
} | |
return handler; | |
} | |
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java | |
index 8aab6c2..178c538 100644 | |
--- a/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java | |
+++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java | |
@@ -154,7 +154,7 @@ public class RedisClusterClient extends BaseRedisClient implements Redis { | |
return promise.future(); | |
} | |
- private void connect(Slots slots, Handler<AsyncResult<RedisConnection>> onConnected) { | |
+ private void connect(Slots slots, Promise<RedisConnection> onConnected) { | |
// create a cluster connection | |
final Set<Throwable> failures = ConcurrentHashMap.newKeySet(); | |
final AtomicInteger counter = new AtomicInteger(); | |
@@ -180,7 +180,7 @@ public class RedisClusterClient extends BaseRedisClient implements Redis { | |
} | |
private void connectionComplete(AtomicInteger counter, Slots slots, Map<String, PooledRedisConnection> connections, | |
- Set<Throwable> failures, Handler<AsyncResult<RedisConnection>> onConnected) { | |
+ Set<Throwable> failures, Promise<RedisConnection> onConnected) { | |
if (counter.incrementAndGet() == slots.endpoints().length) { | |
// end condition | |
if (!failures.isEmpty()) { | |
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java | |
index ecf8395..42a3b15 100644 | |
--- a/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java | |
+++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java | |
@@ -266,7 +266,7 @@ public class RedisClusterConnection implements RedisConnection { | |
return map; | |
} | |
- void send(String endpoint, int retries, Request command, Handler<AsyncResult<Response>> handler) { | |
+ void send(String endpoint, int retries, Request command, Promise<Response> handler) { | |
PooledRedisConnection connection = connections.get(endpoint); | |
if (connection == null) { | |
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null) | |
@@ -319,13 +319,14 @@ public class RedisClusterConnection implements RedisConnection { | |
} | |
String newEndpoint = uri.protocol() + "://" + uri.userinfo() + addr; | |
if (ask) { | |
- send(newEndpoint, retries - 1, Request.cmd(Command.ASKING), resp -> { | |
- if (resp.failed()) { | |
- handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause)); | |
- } else { | |
- send(newEndpoint, retries - 1, command, handler); | |
- } | |
- }); | |
+ Future.<Response>future(p -> send(newEndpoint, retries - 1, Request.cmd(Command.ASKING), p)) | |
+ .onComplete(resp -> { | |
+ if (resp.failed()) { | |
+ handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause)); | |
+ } else { | |
+ send(newEndpoint, retries - 1, command, handler); | |
+ } | |
+ }); | |
} else { | |
send(newEndpoint, retries - 1, command, handler); | |
} | |
@@ -447,7 +448,7 @@ public class RedisClusterConnection implements RedisConnection { | |
return promise.future(); | |
} | |
- private void batch(String endpoint, int retries, List<Request> commands, Handler<AsyncResult<List<Response>>> handler) { | |
+ private void batch(String endpoint, int retries, List<Request> commands, Promise<List<Response>> handler) { | |
RedisConnection connection = connections.get(endpoint); | |
if (connection == null) { | |
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null) | |
@@ -500,13 +501,14 @@ public class RedisClusterConnection implements RedisConnection { | |
} | |
String newEndpoint = uri.protocol() + "://" + uri.userinfo() + addr; | |
if (ask) { | |
- batch(newEndpoint, retries - 1, Collections.singletonList(Request.cmd(Command.ASKING)), resp -> { | |
- if (resp.failed()) { | |
- handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause)); | |
- } else { | |
- batch(newEndpoint, retries - 1, commands, handler); | |
- } | |
- }); | |
+ Future.<List<Response>>future(p -> batch(newEndpoint, retries - 1, Collections.singletonList(Request.cmd(Command.ASKING)), p)) | |
+ .onComplete(resp -> { | |
+ if (resp.failed()) { | |
+ handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause)); | |
+ } else { | |
+ batch(newEndpoint, retries - 1, commands, handler); | |
+ } | |
+ }); | |
} else { | |
batch(newEndpoint, retries - 1, commands, handler); | |
} | |
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java | |
index 243c724..10d2400 100644 | |
--- a/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java | |
+++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterImpl.java | |
@@ -83,14 +83,15 @@ public class RedisClusterImpl implements RedisCluster { | |
return; | |
} | |
- conn.send(endpoints[index], RedisClusterConnection.RETRIES, request, ar -> { | |
- if (ar.succeeded()) { | |
- result.add(ar.result()); | |
- onAllNodes(endpoints, index + 1, request, result, conn, promise); | |
- } else { | |
- promise.fail(ar.cause()); | |
- } | |
- }); | |
+ Future.<Response>future(p -> conn.send(endpoints[index], RedisClusterConnection.RETRIES, request, p)) | |
+ .onComplete(ar -> { | |
+ if (ar.succeeded()) { | |
+ result.add(ar.result()); | |
+ onAllNodes(endpoints, index + 1, request, result, conn, promise); | |
+ } else { | |
+ promise.fail(ar.cause()); | |
+ } | |
+ }); | |
} | |
@Override | |
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java b/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java | |
index 4c0480e..e90c448 100644 | |
--- a/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java | |
+++ b/src/main/java/io/vertx/redis/client/impl/RedisReplicationClient.java | |
@@ -117,7 +117,7 @@ public class RedisReplicationClient extends BaseRedisClient implements Redis { | |
return promise.future(); | |
} | |
- private void connectWithDiscoverTopology(List<String> endpoints, int index, Set<Throwable> failures, Handler<AsyncResult<RedisConnection>> onConnect) { | |
+ private void connectWithDiscoverTopology(List<String> endpoints, int index, Set<Throwable> failures, Promise<RedisConnection> onConnect) { | |
if (index >= endpoints.size()) { | |
// stop condition | |
StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints"); | |
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java b/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java | |
index 61d67ad..6ad3e5b 100644 | |
--- a/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java | |
+++ b/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java | |
@@ -20,6 +20,8 @@ import io.vertx.core.Future; | |
import io.vertx.core.Handler; | |
import io.vertx.core.Promise; | |
import io.vertx.core.Vertx; | |
+import io.vertx.core.internal.ContextInternal; | |
+import io.vertx.core.internal.PromiseInternal; | |
import io.vertx.core.internal.logging.Logger; | |
import io.vertx.core.internal.logging.LoggerFactory; | |
import io.vertx.core.net.NetClientOptions; | |
@@ -73,7 +75,7 @@ public class RedisSentinelClient extends BaseRedisClient implements Redis { | |
public Future<RedisConnection> connect() { | |
final Promise<RedisConnection> promise = vertx.promise(); | |
- createConnectionInternal(connectOptions, connectOptions.getRole(), createConnection -> { | |
+ createConnectionInternal(vertx.getOrCreateContext(), connectOptions, connectOptions.getRole()).onComplete(createConnection -> { | |
if (createConnection.failed()) { | |
promise.fail(createConnection.cause()); | |
return; | |
@@ -125,12 +127,12 @@ public class RedisSentinelClient extends BaseRedisClient implements Redis { | |
} | |
private Future<PooledRedisConnection> createConnectionInternal(RedisRole role) { | |
- Promise<PooledRedisConnection> promise = Promise.promise(); | |
- createConnectionInternal(connectOptions, role, promise); | |
- return promise.future(); | |
+ return createConnectionInternal(vertx.getOrCreateContext(), connectOptions, role); | |
} | |
- private void createConnectionInternal(RedisSentinelConnectOptions options, RedisRole role, Handler<AsyncResult<PooledRedisConnection>> onCreate) { | |
+ private Future<PooledRedisConnection> createConnectionInternal(ContextInternal ctx, RedisSentinelConnectOptions options, RedisRole role) { | |
+ | |
+ PromiseInternal<PooledRedisConnection> onCreate = ctx.promise(); | |
final Handler<AsyncResult<RedisURI>> createAndConnect = resolve -> { | |
if (resolve.failed()) { | |
@@ -164,6 +166,8 @@ public class RedisSentinelClient extends BaseRedisClient implements Redis { | |
resolveClient(this::getReplicaFromEndpoint, options, createAndConnect); | |
break; | |
} | |
+ | |
+ return onCreate.future(); | |
} | |
/** | |
diff --git a/src/main/java/io/vertx/redis/client/impl/SharedSlots.java b/src/main/java/io/vertx/redis/client/impl/SharedSlots.java | |
index b095157..16f7d70 100644 | |
--- a/src/main/java/io/vertx/redis/client/impl/SharedSlots.java | |
+++ b/src/main/java/io/vertx/redis/client/impl/SharedSlots.java | |
@@ -55,7 +55,7 @@ class SharedSlots { | |
} | |
} | |
- private void getSlots(List<String> endpoints, int index, Set<Throwable> failures, Handler<AsyncResult<Slots>> onGotSlots) { | |
+ private void getSlots(List<String> endpoints, int index, Set<Throwable> failures, Promise<Slots> onGotSlots) { | |
if (index >= endpoints.size()) { | |
// stop condition | |
StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints"); | |
diff --git a/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java b/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java | |
index 931f00f..b7e68e4 100644 | |
--- a/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java | |
+++ b/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java | |
@@ -146,7 +146,7 @@ public class ProxiedRequest implements ProxyRequest { | |
return new ProxiedResponse(this, proxiedRequest.response()); | |
} | |
- void sendRequest(Handler<AsyncResult<ProxyResponse>> responseHandler) { | |
+ void sendRequest(Promise<ProxyResponse> responseHandler) { | |
request.response().<ProxyResponse>map(r -> { | |
r.pause(); // Pause it | |
diff --git a/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java b/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java | |
index 5d0690f..36a1d20 100644 | |
--- a/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java | |
+++ b/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java | |
@@ -172,7 +172,7 @@ class ProxiedResponse implements ProxyResponse { | |
return promise.future(); | |
} | |
- public void send(Handler<AsyncResult<Void>> completionHandler) { | |
+ public void send(Promise<Void> completionHandler) { | |
// Set stuff | |
proxiedResponse.setStatusCode(statusCode); | |
@@ -261,7 +261,7 @@ class ProxiedResponse implements ProxyResponse { | |
return this; | |
} | |
- private void sendResponse(ReadStream<Buffer> body, Handler<AsyncResult<Void>> completionHandler) { | |
+ private void sendResponse(ReadStream<Buffer> body, Promise<Void> completionHandler) { | |
Pipe<Buffer> pipe = body.pipe(); | |
pipe.endOnSuccess(true); | |
pipe.endOnFailure(false); | |
diff --git a/src/test/java/io/vertx/ext/mongo/MongoClientTest.java b/src/test/java/io/vertx/ext/mongo/MongoClientTest.java | |
index 89af155..424d636 100644 | |
--- a/src/test/java/io/vertx/ext/mongo/MongoClientTest.java | |
+++ b/src/test/java/io/vertx/ext/mongo/MongoClientTest.java | |
@@ -177,7 +177,7 @@ public class MongoClientTest extends MongoClientTestBase { | |
CountDownLatch latch = new CountDownLatch(1); | |
AtomicReference<List<String>> foos = new AtomicReference<>(); | |
mongoClient.createCollection(collection).onComplete(onSuccess(res -> { | |
- insertDocs(mongoClient, collection, numDocs, onSuccess(res2 -> { | |
+ insertDocs(mongoClient, collection, numDocs).onComplete(onSuccess(res2 -> { | |
FindOptions findOptions = new FindOptions().setSort(new JsonObject().put("counter", 1)).setBatchSize(1); | |
ReadStream<JsonObject> stream = mongoClient.findBatchWithOptions(collection, new JsonObject(), findOptions); | |
streamReference.set(stream); | |
@@ -239,7 +239,7 @@ public class MongoClientTest extends MongoClientTestBase { | |
final CountDownLatch latch = new CountDownLatch(1); | |
final AtomicLong count = new AtomicLong(); | |
mongoClient.createCollection(collection).onComplete(onSuccess(res -> { | |
- insertDocs(mongoClient, collection, numDocs, onSuccess(res2 -> { | |
+ insertDocs(mongoClient, collection, numDocs).onComplete(onSuccess(res2 -> { | |
mongoClient.aggregate(collection, | |
new JsonArray().add(new JsonObject().put("$match", new JsonObject().put("foo", new JsonObject().put("$regex", "bar1")))) | |
.add(new JsonObject().put("$count", "foo_starting_with_bar1"))) | |
@@ -263,7 +263,7 @@ public class MongoClientTest extends MongoClientTestBase { | |
final CountDownLatch fetchLatch = new CountDownLatch(numDocs); | |
final CountDownLatch endLatch = new CountDownLatch(1); | |
final String collection = randomCollection(); | |
- insertDocs(mongoClient, collection, numDocs, onSuccess(res -> { | |
+ insertDocs(mongoClient, collection, numDocs).onComplete(onSuccess(res -> { | |
mongoClient.aggregateWithOptions(collection, pipeline, aggregateOptions) | |
.exceptionHandler(this::fail) | |
.handler(j -> fetchLatch.countDown()) | |
diff --git a/src/test/java/io/vertx/ext/mongo/MongoTestBase.java b/src/test/java/io/vertx/ext/mongo/MongoTestBase.java | |
index b325267..1e23323 100644 | |
--- a/src/test/java/io/vertx/ext/mongo/MongoTestBase.java | |
+++ b/src/test/java/io/vertx/ext/mongo/MongoTestBase.java | |
@@ -19,6 +19,7 @@ package io.vertx.ext.mongo; | |
import io.vertx.core.AsyncResult; | |
import io.vertx.core.Future; | |
import io.vertx.core.Handler; | |
+import io.vertx.core.Promise; | |
import io.vertx.core.json.JsonArray; | |
import io.vertx.core.json.JsonObject; | |
import io.vertx.test.core.TestUtils; | |
@@ -126,7 +127,7 @@ public abstract class MongoTestBase extends VertxTestBase { | |
return "ext-mongo" + TestUtils.randomAlphaString(20); | |
} | |
- protected void insertDocs(MongoClient mongoClient, String collection, int num, Handler<AsyncResult<Void>> resultHandler) { | |
+ protected void insertDocs(MongoClient mongoClient, String collection, int num, Promise<Void> resultHandler) { | |
insertDocs(mongoClient, collection, num, this::createDoc, resultHandler); | |
} | |
@@ -138,7 +139,7 @@ public abstract class MongoTestBase extends VertxTestBase { | |
return Future.future(h -> insertDocs(mongoClient, collection, num, docSupplier, h)); | |
} | |
- protected void insertDocs(MongoClient mongoClient, String collection, int num, Function<Integer, JsonObject> docSupplier, Handler<AsyncResult<Void>> resultHandler) { | |
+ protected void insertDocs(MongoClient mongoClient, String collection, int num, Function<Integer, JsonObject> docSupplier, Promise<Void> resultHandler) { | |
if (num != 0) { | |
AtomicInteger cnt = new AtomicInteger(); | |
for (int i = 0; i < num; i++) { | |
diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java | |
index 63d8f4f..9893381 100644 | |
--- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java | |
+++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java | |
@@ -17,6 +17,7 @@ package io.vertx.db2client.impl; | |
import io.netty.channel.ChannelPipeline; | |
import io.vertx.core.AsyncResult; | |
+import io.vertx.core.Future; | |
import io.vertx.core.Handler; | |
import io.vertx.core.Promise; | |
import io.vertx.core.internal.ContextInternal; | |
@@ -85,7 +86,13 @@ public class DB2SocketConnection extends SocketConnectionBase { | |
if (txCmd.kind == TxCommand.Kind.BEGIN) { | |
// DB2 always implicitly starts a transaction with each query, and does | |
// not support the 'BEGIN' keyword. Instead we can no-op BEGIN commands | |
- cmd.handler = handler; | |
+ cmd.handler = (res, err) -> { | |
+ if (err != null) { | |
+ handler.handle(Future.failedFuture(err)); | |
+ } else { | |
+ handler.handle(Future.succeededFuture(res)); | |
+ } | |
+ }; | |
cmd.complete(CommandResponse.success(txCmd.result).toAsyncResult()); | |
} else { | |
SimpleQueryCommand<Void> cmd2 = new SimpleQueryCommand<>(txCmd.kind.sql, false, false, | |
diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java | |
index 49b6ad8..17cf422 100644 | |
--- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java | |
+++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java | |
@@ -173,7 +173,7 @@ public class OracleJdbcConnection implements Connection { | |
return promise.future(); | |
} | |
- private <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handler) { | |
+ private <R> void doSchedule(CommandBase<R> cmd, Promise<R> handler) { | |
cmd.handler = handler; | |
if (closePromise == null) { | |
pending.add(cmd); | |
diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/pubsub/PgSubscriberImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/pubsub/PgSubscriberImpl.java | |
index 2d3de59..83ab86a 100644 | |
--- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/pubsub/PgSubscriberImpl.java | |
+++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/pubsub/PgSubscriberImpl.java | |
@@ -106,7 +106,7 @@ public class PgSubscriberImpl implements PgSubscriber { | |
if (!closed) { | |
Long val = reconnectPolicy.apply(count); | |
if (val >= 0) { | |
- tryConnect(val, ar -> { | |
+ Future.<Void>future(p -> tryConnect(val, p)).onComplete(ar -> { | |
if (ar.failed()) { | |
checkReconnect(count + 1); | |
} | |
@@ -150,7 +150,7 @@ public class PgSubscriberImpl implements PgSubscriber { | |
return promise.future(); | |
} | |
- private void tryConnect(long delayMillis, Handler<AsyncResult<Void>> handler) { | |
+ private void tryConnect(long delayMillis, Promise<Void> handler) { | |
if (!connecting) { | |
connecting = true; | |
if (delayMillis > 0) { | |
@@ -161,11 +161,11 @@ public class PgSubscriberImpl implements PgSubscriber { | |
} | |
} | |
- private void doConnect(Handler<AsyncResult<Void>> completionHandler) { | |
+ private void doConnect(Promise<Void> completionHandler) { | |
PgConnection.connect(vertx, options).onComplete(ar -> handleConnectResult(completionHandler, ar)); | |
} | |
- private synchronized void handleConnectResult(Handler<AsyncResult<Void>> completionHandler, AsyncResult<PgConnection> ar1) { | |
+ private synchronized void handleConnectResult(Promise<Void> completionHandler, AsyncResult<PgConnection> ar1) { | |
connecting = false; | |
if (ar1.succeeded()) { | |
conn = ar1.result(); | |
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java | |
index 2cc46eb..0fc7d16 100644 | |
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java | |
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java | |
@@ -45,6 +45,7 @@ import io.vertx.sqlclient.spi.DatabaseMetadata; | |
import java.util.ArrayDeque; | |
import java.util.List; | |
+import java.util.function.BiConsumer; | |
import java.util.function.Predicate; | |
/** | |
@@ -207,6 +208,16 @@ public abstract class SocketConnectionBase implements Connection { | |
} | |
protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handler) { | |
+ doSchedule(cmd, (r, err) -> { | |
+ if (err == null) { | |
+ handler.handle(Future.succeededFuture(r)); | |
+ } else { | |
+ handler.handle(Future.failedFuture(err)); | |
+ } | |
+ }); | |
+ } | |
+ | |
+ protected <R> void doSchedule(CommandBase<R> cmd, BiConsumer<R, Throwable> handler) { | |
if (handler == null) { | |
throw new IllegalArgumentException(); | |
} | |
@@ -220,7 +231,7 @@ public abstract class SocketConnectionBase implements Connection { | |
CompositeCommand composite = (CompositeCommand) cmd; | |
List<CommandBase<?>> commands = composite.commands(); | |
pending.addAll(commands); | |
- composite.handler.handle(Future.succeededFuture()); | |
+ composite.handler.accept(null, null); | |
} else { | |
pending.add(cmd); | |
} | |
@@ -285,10 +296,10 @@ public abstract class SocketConnectionBase implements Connection { | |
private PrepareStatementCommand prepareCommand(ExtendedQueryCommand<?> queryCmd, boolean cache, boolean sendParameterTypes) { | |
PrepareStatementCommand prepareCmd = new PrepareStatementCommand(queryCmd.sql(), null, cache, sendParameterTypes ? queryCmd.parameterTypes() : null); | |
- prepareCmd.handler = ar -> { | |
+ prepareCmd.handler = (r, err) -> { | |
paused = false; | |
- if (ar.succeeded()) { | |
- PreparedStatement ps = ar.result(); | |
+ if (err == null) { | |
+ PreparedStatement ps = r; | |
if (cache) { | |
cacheStatement(ps); | |
} | |
@@ -303,7 +314,7 @@ public abstract class SocketConnectionBase implements Connection { | |
ctx.flush(); | |
} | |
} else { | |
- Throwable cause = ar.cause(); | |
+ Throwable cause = err; | |
if (isIndeterminatePreparedStatementError(cause) && !sendParameterTypes) { | |
ChannelHandlerContext ctx = socket.channelHandlerContext(); | |
// We cannot cache this prepared statement because it might be executed with another type | |
@@ -340,9 +351,9 @@ public abstract class SocketConnectionBase implements Connection { | |
if (psCache != null && psCache.isFull()) { | |
PreparedStatement evicted = psCache.evict(); | |
CloseStatementCommand closeCmd = new CloseStatementCommand(evicted); | |
- closeCmd.handler = ar -> { | |
- if (ar.failed()) { | |
- logger.error("Error when closing cached prepared statement", ar.cause()); | |
+ closeCmd.handler = (r, err) -> { | |
+ if (err != null) { | |
+ logger.error("Error when closing cached prepared statement", err); | |
} | |
}; | |
return closeCmd; | |
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java | |
index d2e8193..8492478 100644 | |
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java | |
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java | |
@@ -28,6 +28,8 @@ import io.vertx.sqlclient.internal.Connection; | |
import io.vertx.sqlclient.internal.command.CommandBase; | |
import io.vertx.sqlclient.internal.command.TxCommand; | |
+import java.util.function.BiConsumer; | |
+ | |
public class TransactionImpl implements Transaction { | |
private final ContextInternal context; | |
@@ -59,18 +61,18 @@ public class TransactionImpl implements Transaction { | |
} | |
private <R> void execute(CommandBase<R> cmd) { | |
- Handler<AsyncResult<R>> handler = cmd.handler; | |
+ BiConsumer<R, Throwable> handler = cmd.handler; | |
connection.schedule(context, cmd).onComplete(handler); | |
} | |
- private <T> Handler<AsyncResult<T>> wrap(CommandBase<?> cmd, Promise<T> handler) { | |
- return ar -> { | |
+ private <T> BiConsumer<T, Throwable> wrap(CommandBase<?> cmd, Promise<T> handler) { | |
+ return (r, err) -> { | |
CommandBase<?> abc = cmd; | |
synchronized (TransactionImpl.this) { | |
pendingQueries--; | |
} | |
checkEnd(); | |
- handler.handle(ar); | |
+ handler.handle(err == null ? Future.succeededFuture(r) : Future.failedFuture(err)); | |
}; | |
} | |
@@ -150,11 +152,11 @@ public class TransactionImpl implements Transaction { | |
private TxCommand<Void> txCommand(TxCommand.Kind kind) { | |
TxCommand<Void> cmd = new TxCommand<>(kind, null); | |
- cmd.handler = ar -> { | |
- if (ar.succeeded()) { | |
+ cmd.handler = (res, err) -> { | |
+ if (err == null) { | |
completion.complete(kind); | |
} else { | |
- completion.fail(ar.cause()); | |
+ completion.fail(err); | |
} | |
}; | |
return cmd; | |
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java | |
index de4a84f..af71e8f 100644 | |
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java | |
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java | |
@@ -150,7 +150,7 @@ public class SqlConnectionPool { | |
public void evict() { | |
long now = System.currentTimeMillis(); | |
- pool.evict(conn -> conn.shouldEvict(now), ar -> { | |
+ pool.evict(conn -> conn.shouldEvict(now)).onComplete(ar -> { | |
if (ar.succeeded()) { | |
List<PooledConnection> res = ar.result(); | |
for (PooledConnection conn : res) { | |
@@ -184,7 +184,7 @@ public class SqlConnectionPool { | |
public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) { | |
Promise<Lease<PooledConnection>> p = context.promise(); | |
Object metric = enqueueMetric(); | |
- pool.acquire(context, 0, p); | |
+ pool.acquire(context, 0).onComplete(p); | |
return p.future().compose(lease -> { | |
dequeueMetric(metric); | |
PooledConnection pooled = lease.get(); | |
@@ -204,7 +204,7 @@ public class SqlConnectionPool { | |
}); | |
} | |
- public void acquire(ContextInternal context, long timeout, Handler<AsyncResult<PooledConnection>> handler) { | |
+ public void acquire(ContextInternal context, long timeout, Promise<PooledConnection> handler) { | |
class PoolRequest implements PoolWaiter.Listener<PooledConnection>, Handler<AsyncResult<Lease<PooledConnection>>> { | |
private final Object metric; | |
@@ -249,7 +249,7 @@ public class SqlConnectionPool { | |
public void onEnqueue(PoolWaiter<PooledConnection> waiter) { | |
if (timeout > 0L && timerID == -1L) { | |
timerID = context.setTimer(timeout, id -> { | |
- pool.cancel(waiter, ar -> { | |
+ pool.cancel(waiter).onComplete(ar -> { | |
if (ar.succeeded()) { | |
if (ar.result()) { | |
handler.handle(Future.failedFuture("Timeout")); | |
@@ -269,12 +269,12 @@ public class SqlConnectionPool { | |
} | |
Object metric = enqueueMetric(); | |
PoolRequest request = new PoolRequest(metric); | |
- pool.acquire(context, request, 0, request); | |
+ pool.acquire(context, request, 0).onComplete(request); | |
} | |
public Future<Void> close() { | |
Promise<Void> promise = vertx.promise(); | |
- pool.close(ar1 -> { | |
+ pool.close().onComplete(ar1 -> { | |
if (ar1.succeeded()) { | |
List<Future<Void>> results = ar1 | |
.result() | |
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandBase.java | |
index c591c96..6c0b894 100644 | |
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandBase.java | |
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandBase.java | |
@@ -21,13 +21,15 @@ import io.vertx.core.AsyncResult; | |
import io.vertx.core.Future; | |
import io.vertx.core.Handler; | |
+import java.util.function.BiConsumer; | |
+ | |
/** | |
* @author <a href="mailto:[email protected]">Julien Viet</a> | |
*/ | |
public abstract class CommandBase<R> { | |
- public Handler<AsyncResult<R>> handler; | |
+ public BiConsumer<R, Throwable> handler; | |
public final void fail(Throwable err) { | |
complete(Future.failedFuture(err)); | |
@@ -39,7 +41,7 @@ public abstract class CommandBase<R> { | |
public final void complete(AsyncResult<R> resp) { | |
if (handler != null) { | |
- handler.handle(resp); | |
+ handler.accept(resp.result(), resp.cause()); | |
} | |
} | |
} | |
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandResponse.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandResponse.java | |
index 85c123f..845f632 100644 | |
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandResponse.java | |
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandResponse.java | |
@@ -43,7 +43,7 @@ public class CommandResponse<R> { | |
public void fire() { | |
if (cmd.handler != null) { | |
- cmd.handler.handle(toAsyncResult()); | |
+ cmd.handler.accept(toAsyncResult().result(), toAsyncResult().cause()); | |
} | |
} | |
} | |
diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java | |
index 2fad432..921232e 100644 | |
--- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java | |
+++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java | |
@@ -175,7 +175,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable { | |
return pool.execute(context, cmd); | |
} | |
- private void acquire(ContextInternal context, long timeout, Handler<AsyncResult<SqlConnectionPool.PooledConnection>> completionHandler) { | |
+ private void acquire(ContextInternal context, long timeout, Promise<SqlConnectionPool.PooledConnection> completionHandler) { | |
pool.acquire(context, timeout, completionHandler); | |
} | |
diff --git a/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java b/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java | |
index b24f98d..16feb8e 100644 | |
--- a/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java | |
+++ b/vertx-service-discovery-backend-consul/src/main/java/io/vertx/servicediscovery/backend/consul/ConsulBackendService.java | |
@@ -131,7 +131,13 @@ public class ConsulBackendService implements ServiceDiscoveryBackend { | |
@Override | |
public void getRecord(String uuid, Handler<AsyncResult<Record>> resultHandler) { | |
Promise<List<Record>> recordList = Promise.promise(); | |
- getRecords(recordList); | |
+ getRecords(ar -> { | |
+ if (ar.succeeded()) { | |
+ recordList.complete(ar.result()); | |
+ } else { | |
+ recordList.fail(ar.cause()); | |
+ } | |
+ }); | |
recordList.future().map(l -> l.stream().filter(r -> uuid.equals(r.getRegistration())).findFirst().orElse(null)).onComplete(resultHandler); | |
} | |
diff --git a/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java b/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java | |
index 359b5af..d4701c6 100644 | |
--- a/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java | |
+++ b/vertx-service-discovery-backend-zookeeper/src/main/java/io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendService.java | |
@@ -228,7 +228,13 @@ public class ZookeeperBackendService implements ServiceDiscoveryBackend, Connect | |
List<Future<Record>> futures = new ArrayList<>(); | |
for (String child : children) { | |
Promise<Record> promise = Promise.promise(); | |
- getRecord(child, promise); | |
+ getRecord(child, ar -> { | |
+ if (ar.succeeded()) { | |
+ promise.complete(ar.result()); | |
+ } else { | |
+ promise.fail(ar.cause()); | |
+ } | |
+ }); | |
futures.add(promise.future()); | |
} | |
diff --git a/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java b/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java | |
index ffc3d54..5cd8371 100644 | |
--- a/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java | |
+++ b/vertx-service-discovery-bridge-kubernetes/src/main/java/io/vertx/servicediscovery/kubernetes/KubernetesServiceImporter.java | |
@@ -334,7 +334,7 @@ public class KubernetesServiceImporter implements ServiceImporter { | |
return result.onSuccess(tk -> this.token = tk).mapEmpty(); | |
} | |
- private void publishRecord(Record record, Handler<AsyncResult<Record>> completionHandler) { | |
+ private void publishRecord(Record record, Promise<Record> completionHandler) { | |
publisher.publish(record).onComplete(ar -> { | |
if (completionHandler != null) { | |
completionHandler.handle(ar); | |
diff --git a/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java b/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java | |
index 59485ab..ea4d87a 100644 | |
--- a/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java | |
+++ b/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/DiscoveryImpl.java | |
@@ -170,7 +170,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
} | |
public ServiceDiscovery registerServiceImporter(ServiceImporter importer, JsonObject configuration, | |
- Handler<AsyncResult<Void>> completionHandler) { | |
+ Promise<Void> completionHandler) { | |
JsonObject conf; | |
if (configuration == null) { | |
conf = new JsonObject(); | |
@@ -216,7 +216,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
} | |
public ServiceDiscovery registerServiceExporter(ServiceExporter exporter, JsonObject configuration, | |
- Handler<AsyncResult<Void>> completionHandler) { | |
+ Promise<Void> completionHandler) { | |
JsonObject conf; | |
if (configuration == null) { | |
conf = new JsonObject(); | |
@@ -275,7 +275,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
}); | |
} | |
- public void publish(Record record, Handler<AsyncResult<Record>> resultHandler) { | |
+ public void publish(Record record, Promise<Record> resultHandler) { | |
Status status = record.getStatus() == null || record.getStatus() == Status.UNKNOWN | |
? Status.UP : record.getStatus(); | |
@@ -305,7 +305,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
return promise.future(); | |
} | |
- public void unpublish(String id, Handler<AsyncResult<Void>> resultHandler) { | |
+ public void unpublish(String id, Promise<Void> resultHandler) { | |
backend.remove(id, record -> { | |
if (record.failed()) { | |
resultHandler.handle(Future.failedFuture(record.cause())); | |
@@ -334,7 +334,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
} | |
public void getRecord(JsonObject filter, | |
- Handler<AsyncResult<Record>> resultHandler) { | |
+ Promise<Record> resultHandler) { | |
boolean includeOutOfService = false; | |
Function<Record, Boolean> accept; | |
if (filter == null) { | |
@@ -354,8 +354,14 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
return promise.future(); | |
} | |
- public void getRecord(String id, Handler<AsyncResult<@Nullable Record>> resultHandler) { | |
- backend.getRecord(id, resultHandler); | |
+ public void getRecord(String id, Promise<@Nullable Record> resultHandler) { | |
+ backend.getRecord(id, ar -> { | |
+ if (ar.succeeded()) { | |
+ resultHandler.complete(ar.result()); | |
+ } else { | |
+ resultHandler.fail(ar.cause()); | |
+ } | |
+ }); | |
} | |
@Override | |
@@ -365,7 +371,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
return promise.future(); | |
} | |
- public void getRecord(Function<Record, Boolean> filter, Handler<AsyncResult<Record>> resultHandler) { | |
+ public void getRecord(Function<Record, Boolean> filter, Promise<Record> resultHandler) { | |
getRecord(filter, false, resultHandler); | |
} | |
@@ -376,8 +382,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
return promise.future(); | |
} | |
- public void getRecord(Function<Record, Boolean> filter, boolean includeOutOfService, Handler<AsyncResult<Record>> | |
- resultHandler) { | |
+ public void getRecord(Function<Record, Boolean> filter, boolean includeOutOfService, Promise<Record> resultHandler) { | |
Objects.requireNonNull(filter); | |
backend.getRecords(list -> { | |
if (list.failed()) { | |
@@ -403,7 +408,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
return promise.future(); | |
} | |
- public void getRecords(JsonObject filter, Handler<AsyncResult<List<Record>>> resultHandler) { | |
+ public void getRecords(JsonObject filter, Promise<List<Record>> resultHandler) { | |
boolean includeOutOfService = false; | |
Function<Record, Boolean> accept; | |
if (filter == null) { | |
@@ -423,7 +428,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
return promise.future(); | |
} | |
- public void getRecords(Function<Record, Boolean> filter, Handler<AsyncResult<List<Record>>> resultHandler) { | |
+ public void getRecords(Function<Record, Boolean> filter, Promise<List<Record>> resultHandler) { | |
getRecords(filter, false, resultHandler); | |
} | |
@@ -434,7 +439,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
return promise.future(); | |
} | |
- public void getRecords(Function<Record, Boolean> filter, boolean includeOutOfService, Handler<AsyncResult<List<Record>>> resultHandler) { | |
+ public void getRecords(Function<Record, Boolean> filter, boolean includeOutOfService, Promise<List<Record>> resultHandler) { | |
Objects.requireNonNull(filter); | |
backend.getRecords(list -> { | |
if (list.failed()) { | |
@@ -457,7 +462,7 @@ public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher { | |
return promise.future(); | |
} | |
- public void update(Record record, Handler<AsyncResult<Record>> resultHandler) { | |
+ public void update(Record record, Promise<Record> resultHandler) { | |
backend.update(record, ar -> { | |
if (ar.failed()) { | |
resultHandler.handle(Future.failedFuture(ar.cause())); | |
diff --git a/src/main/java/io/vertx/cassandra/impl/ResultSetImpl.java b/src/main/java/io/vertx/cassandra/impl/ResultSetImpl.java | |
index 2309179..1d07231 100644 | |
--- a/src/main/java/io/vertx/cassandra/impl/ResultSetImpl.java | |
+++ b/src/main/java/io/vertx/cassandra/impl/ResultSetImpl.java | |
@@ -94,7 +94,7 @@ public class ResultSetImpl implements ResultSet { | |
return resultSetRef.get().wasApplied(); | |
} | |
- private void loadMore(Context context, List<Row> loaded, Handler<AsyncResult<List<Row>>> handler) { | |
+ private void loadMore(Context context, List<Row> loaded, Promise<List<Row>> handler) { | |
int availableWithoutFetching = resultSetRef.get().remaining(); | |
List<Row> rows = new ArrayList<>(loaded.size() + availableWithoutFetching); | |
rows.addAll(loaded); | |
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java | |
index bb62cef..07b7f11 100644 | |
--- a/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java | |
+++ b/src/test/java/io/vertx/circuitbreaker/impl/CircuitBreakerImplTest.java | |
@@ -787,7 +787,7 @@ public class CircuitBreakerImplTest { | |
List<AsyncResult<String>> results = new ArrayList<>(); | |
for (int i = 0; i < options.getMaxFailures(); i++) { | |
breaker.<String>execute(future -> future.fail("expected failure")) | |
- .onComplete(results::add); | |
+ .onComplete(ar -> results.add(ar)); | |
} | |
await().until(() -> results.size() == options.getMaxFailures()); | |
results.forEach(ar -> { | |
@@ -799,7 +799,7 @@ public class CircuitBreakerImplTest { | |
await().until(() -> breaker.state() == CircuitBreakerState.OPEN); | |
breaker.<String>execute(future -> future.fail("expected failure")) | |
- .onComplete(results::add); | |
+ .onComplete(ar -> results.add(ar)); | |
await().until(() -> results.size() == 1); | |
results.forEach(ar -> { | |
assertThat(ar.failed()).isTrue(); | |
@@ -820,7 +820,7 @@ public class CircuitBreakerImplTest { | |
// Ignored. | |
} | |
}) | |
- .onComplete(results::add); | |
+ .onComplete(ar -> results.add(ar)); | |
await().until(() -> results.size() == 1); | |
results.forEach(ar -> { | |
assertThat(ar.failed()).isTrue(); | |
@@ -845,7 +845,7 @@ public class CircuitBreakerImplTest { | |
t -> { | |
throw new RuntimeException("boom"); | |
}) | |
- .onComplete(results::add); | |
+ .onComplete(ar -> results.add(ar)); | |
} | |
await().until(() -> results.size() == options.getMaxFailures()); | |
results.forEach(ar -> { | |
@@ -861,7 +861,7 @@ public class CircuitBreakerImplTest { | |
t -> { | |
throw new RuntimeException("boom"); | |
}) | |
- .onComplete(results::add); | |
+ .onComplete(ar -> results.add(ar)); | |
await().until(() -> results.size() == 1); | |
results.forEach(ar -> { | |
assertThat(ar.failed()).isTrue(); | |
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/MyAsyncOperations.java b/src/test/java/io/vertx/circuitbreaker/impl/MyAsyncOperations.java | |
index 0f5021a..f445e69 100644 | |
--- a/src/test/java/io/vertx/circuitbreaker/impl/MyAsyncOperations.java | |
+++ b/src/test/java/io/vertx/circuitbreaker/impl/MyAsyncOperations.java | |
@@ -27,7 +27,7 @@ import io.vertx.core.Promise; | |
*/ | |
public class MyAsyncOperations { | |
- public static void operation(int a, int b, Handler<AsyncResult<Integer>> handler) { | |
+ public static void operation(int a, int b, Promise<Integer> handler) { | |
handler.handle(Future.succeededFuture(a + b)); | |
} | |
diff --git a/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java b/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java | |
index 91b921a..5b93da3 100644 | |
--- a/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java | |
+++ b/src/test/java/io/vertx/circuitbreaker/impl/UsageTest.java | |
@@ -3,10 +3,7 @@ package io.vertx.circuitbreaker.impl; | |
import com.github.tomakehurst.wiremock.junit.WireMockRule; | |
import io.vertx.circuitbreaker.CircuitBreaker; | |
import io.vertx.circuitbreaker.CircuitBreakerOptions; | |
-import io.vertx.core.AsyncResult; | |
-import io.vertx.core.Future; | |
-import io.vertx.core.Handler; | |
-import io.vertx.core.Vertx; | |
+import io.vertx.core.*; | |
import io.vertx.core.buffer.Buffer; | |
import io.vertx.core.eventbus.Message; | |
import io.vertx.core.eventbus.MessageConsumer; | |
@@ -161,7 +158,7 @@ public class UsageTest { | |
private List<String> items = new ArrayList<>(); | |
- public void asyncWrite(String content, Scenario scenario, Handler<AsyncResult<Void>> resultHandler) { | |
+ public void asyncWrite(String content, Scenario scenario, Promise<Void> resultHandler) { | |
long random = (long) (Math.random() * 1000); | |
switch (scenario) { | |
case TIMEOUT: | |
diff --git a/src/main/java/io/vertx/ext/shell/command/impl/CommandRegistryImpl.java b/src/main/java/io/vertx/ext/shell/command/impl/CommandRegistryImpl.java | |
index 1491aec..f4a734b 100644 | |
--- a/src/main/java/io/vertx/ext/shell/command/impl/CommandRegistryImpl.java | |
+++ b/src/main/java/io/vertx/ext/shell/command/impl/CommandRegistryImpl.java | |
@@ -40,6 +40,7 @@ import io.vertx.ext.shell.command.CommandRegistry; | |
import java.util.*; | |
import java.util.concurrent.ConcurrentHashMap; | |
+import java.util.function.BiConsumer; | |
import java.util.stream.Collectors; | |
/** | |
@@ -90,7 +91,7 @@ public class CommandRegistryImpl implements CommandRegistry { | |
return registerCommand(Command.create(vertx, command)); | |
} | |
- public CommandRegistry registerCommand(Class<? extends AnnotatedCommand> command, Handler<AsyncResult<Command>> completionHandler) { | |
+ public CommandRegistry registerCommand(Class<? extends AnnotatedCommand> command, Promise<Command> completionHandler) { | |
return registerCommand(Command.create(vertx, command), completionHandler); | |
} | |
@@ -101,13 +102,13 @@ public class CommandRegistryImpl implements CommandRegistry { | |
return promise.future(); | |
} | |
- public CommandRegistry registerCommand(Command command, Handler<AsyncResult<Command>> completionHandler) { | |
- return registerCommands(Collections.singletonList(command), ar -> { | |
+ public CommandRegistry registerCommand(Command command, Promise<Command> completionHandler) { | |
+ return registerCommands(Collections.singletonList(command), (res, err) -> { | |
if (completionHandler != null) { | |
- if (ar.succeeded()) { | |
- completionHandler.handle(Future.succeededFuture(ar.result().get(0))); | |
+ if (err == null) { | |
+ completionHandler.handle(Future.succeededFuture(res.get(0))); | |
} else { | |
- completionHandler.handle(Future.failedFuture(ar.cause())); | |
+ completionHandler.handle(Future.failedFuture(err)); | |
} | |
} | |
}); | |
@@ -120,7 +121,7 @@ public class CommandRegistryImpl implements CommandRegistry { | |
return promise.future(); | |
} | |
- public CommandRegistry registerCommands(List<Command> commands, Handler<AsyncResult<List<Command>>> doneHandler) { | |
+ public CommandRegistry registerCommands(List<Command> commands, BiConsumer<List<Command>, Throwable> doneHandler) { | |
if (closed) { | |
throw new IllegalStateException(); | |
} | |
@@ -153,9 +154,9 @@ public class CommandRegistryImpl implements CommandRegistry { | |
filter(reg -> ar.result().equals(reg.deploymendID)). | |
map(reg -> reg.command). | |
collect(Collectors.toList()); | |
- doneHandler.handle(Future.succeededFuture(regs)); | |
+ doneHandler.accept(regs, null); | |
} else { | |
- doneHandler.handle(Future.failedFuture(ar.cause())); | |
+ doneHandler.accept(null, ar.cause()); | |
} | |
}); | |
return this; | |
@@ -168,7 +169,7 @@ public class CommandRegistryImpl implements CommandRegistry { | |
return promise.future(); | |
} | |
- public CommandRegistry unregisterCommand(String name, Handler<AsyncResult<Void>> completionHandler) { | |
+ public CommandRegistry unregisterCommand(String name, Promise<Void> completionHandler) { | |
if (closed) { | |
throw new IllegalStateException(); | |
} | |
diff --git a/src/main/java/io/vertx/ext/shell/impl/ShellServiceImpl.java b/src/main/java/io/vertx/ext/shell/impl/ShellServiceImpl.java | |
index f30f69a..d93ef17 100644 | |
--- a/src/main/java/io/vertx/ext/shell/impl/ShellServiceImpl.java | |
+++ b/src/main/java/io/vertx/ext/shell/impl/ShellServiceImpl.java | |
@@ -118,7 +118,7 @@ public class ShellServiceImpl implements ShellService { | |
return p.future(); | |
} | |
- private void startServer(List<CommandResolver> resolvers, Handler<AsyncResult<Void>> startHandler) { | |
+ private void startServer(List<CommandResolver> resolvers, Promise<Void> startHandler) { | |
TelnetTermOptions telnetOptions = options.getTelnetOptions(); | |
SSHTermOptions sshOptions = options.getSSHOptions(); | |
HttpTermOptions webOptions = options.getHttpOptions(); | |
diff --git a/src/main/java/io/vertx/ext/shell/term/impl/HttpTermServer.java b/src/main/java/io/vertx/ext/shell/term/impl/HttpTermServer.java | |
index c156bb8..6a3b797 100644 | |
--- a/src/main/java/io/vertx/ext/shell/term/impl/HttpTermServer.java | |
+++ b/src/main/java/io/vertx/ext/shell/term/impl/HttpTermServer.java | |
@@ -33,10 +33,7 @@ | |
package io.vertx.ext.shell.term.impl; | |
import io.termd.core.readline.Keymap; | |
-import io.vertx.core.AsyncResult; | |
-import io.vertx.core.Future; | |
-import io.vertx.core.Handler; | |
-import io.vertx.core.Vertx; | |
+import io.vertx.core.*; | |
import io.vertx.core.buffer.Buffer; | |
import io.vertx.core.http.HttpServer; | |
import io.vertx.ext.auth.authentication.AuthenticationProvider; | |
@@ -87,7 +84,7 @@ public class HttpTermServer implements TermServer { | |
return this; | |
} | |
- public TermServer listen(Handler<AsyncResult<Void>> listenHandler) { | |
+ public TermServer listen(Promise<Void> listenHandler) { | |
Charset charset = Charset.forName(options.getCharset()); | |
@@ -169,7 +166,7 @@ public class HttpTermServer implements TermServer { | |
return Future.future(this::close); | |
} | |
- public void close(Handler<AsyncResult<Void>> completionHandler) { | |
+ public void close(Promise<Void> completionHandler) { | |
if (server != null) { | |
server.close() | |
.onComplete(completionHandler); | |
diff --git a/src/main/java/io/vertx/ext/shell/term/impl/SSHServer.java b/src/main/java/io/vertx/ext/shell/term/impl/SSHServer.java | |
index ab314ae..c31c19a 100644 | |
--- a/src/main/java/io/vertx/ext/shell/term/impl/SSHServer.java | |
+++ b/src/main/java/io/vertx/ext/shell/term/impl/SSHServer.java | |
@@ -117,7 +117,7 @@ public class SSHServer implements TermServer { | |
return this; | |
} | |
- public SSHServer listen(Handler<AsyncResult<Void>> listenHandler) { | |
+ public SSHServer listen(Promise<Void> listenHandler) { | |
if (!status.compareAndSet(STATUS_STOPPED, STATUS_STARTING)) { | |
listenHandler.handle(Future.failedFuture("Invalid state:" + status.get())); | |
return this; | |
@@ -219,7 +219,7 @@ public class SSHServer implements TermServer { | |
return nativeServer.getPort(); | |
} | |
- public void close(Handler<AsyncResult<Void>> completionHandler) { | |
+ public void close(Promise<Void> completionHandler) { | |
if (!status.compareAndSet(STATUS_STARTED, STATUS_STOPPING)) { | |
completionHandler.handle(Future.failedFuture("Invalid state:" + status.get())); | |
return; | |
diff --git a/src/main/java/io/vertx/ext/shell/term/impl/TelnetTermServer.java b/src/main/java/io/vertx/ext/shell/term/impl/TelnetTermServer.java | |
index 630b55d..298aae2 100644 | |
--- a/src/main/java/io/vertx/ext/shell/term/impl/TelnetTermServer.java | |
+++ b/src/main/java/io/vertx/ext/shell/term/impl/TelnetTermServer.java | |
@@ -34,10 +34,7 @@ package io.vertx.ext.shell.term.impl; | |
import io.termd.core.readline.Keymap; | |
import io.termd.core.telnet.TelnetTtyConnection; | |
-import io.vertx.core.AsyncResult; | |
-import io.vertx.core.Future; | |
-import io.vertx.core.Handler; | |
-import io.vertx.core.Vertx; | |
+import io.vertx.core.*; | |
import io.vertx.core.buffer.Buffer; | |
import io.vertx.core.net.NetServer; | |
import io.vertx.ext.auth.authentication.AuthenticationProvider; | |
@@ -76,7 +73,7 @@ public class TelnetTermServer implements TermServer { | |
return this; | |
} | |
- public TermServer listen(Handler<AsyncResult<Void>> listenHandler) { | |
+ public TermServer listen(Promise<Void> listenHandler) { | |
Charset charset = Charset.forName(options.getCharset()); | |
if (server == null) { | |
server = vertx.createNetServer(options); | |
@@ -104,7 +101,7 @@ public class TelnetTermServer implements TermServer { | |
return this; | |
} | |
- public void close(Handler<AsyncResult<Void>> completionHandler) { | |
+ public void close(Promise<Void> completionHandler) { | |
if (server != null) { | |
server.close() | |
.onComplete(completionHandler); | |
diff --git a/src/test/java/io/vertx/ext/shell/support/TestTermServer.java b/src/test/java/io/vertx/ext/shell/support/TestTermServer.java | |
index b48e1a6..6cf17c6 100644 | |
--- a/src/test/java/io/vertx/ext/shell/support/TestTermServer.java | |
+++ b/src/test/java/io/vertx/ext/shell/support/TestTermServer.java | |
@@ -33,10 +33,7 @@ | |
package io.vertx.ext.shell.support; | |
import io.termd.core.tty.TtyConnection; | |
-import io.vertx.core.AsyncResult; | |
-import io.vertx.core.Future; | |
-import io.vertx.core.Handler; | |
-import io.vertx.core.Vertx; | |
+import io.vertx.core.*; | |
import io.vertx.ext.auth.authentication.AuthenticationProvider; | |
import io.vertx.ext.shell.term.Term; | |
import io.vertx.ext.shell.term.TermServer; | |
@@ -73,7 +70,7 @@ public class TestTermServer implements TermServer { | |
throw new UnsupportedOperationException(); | |
} | |
- public TermServer listen(Handler<AsyncResult<Void>> listenHandler) { | |
+ public TermServer listen(Promise<Void> listenHandler) { | |
listenHandler.handle(Future.succeededFuture()); | |
return this; | |
} | |
@@ -83,7 +80,7 @@ public class TestTermServer implements TermServer { | |
throw new UnsupportedOperationException(); | |
} | |
- public void close(Handler<AsyncResult<Void>> completionHandler) { | |
+ public void close(Promise<Void> completionHandler) { | |
completionHandler.handle(Future.succeededFuture()); | |
} | |
diff --git a/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java b/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java | |
index c1594e2..25c39d3 100644 | |
--- a/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java | |
+++ b/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java | |
@@ -16,11 +16,7 @@ | |
package io.vertx.ext.stomp.impl; | |
-import io.vertx.core.AsyncResult; | |
-import io.vertx.core.Context; | |
-import io.vertx.core.Future; | |
-import io.vertx.core.Handler; | |
-import io.vertx.core.Vertx; | |
+import io.vertx.core.*; | |
import io.vertx.core.internal.ContextInternal; | |
import io.vertx.core.internal.PromiseInternal; | |
import io.vertx.core.internal.logging.Logger; | |
@@ -420,7 +416,7 @@ public class DefaultStompHandler implements StompServerHandler { | |
public StompServerHandler onAuthenticationRequest(StompServerConnection connection, | |
String login, String passcode, | |
- Handler<AsyncResult<Boolean>> handler) { | |
+ Promise<Boolean> handler) { | |
final AuthenticationProvider auth; | |
synchronized (this) { | |
// Stack contention. | |
diff --git a/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java b/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java | |
index 35ff102..9fcd5b8 100644 | |
--- a/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java | |
+++ b/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java | |
@@ -224,7 +224,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public synchronized StompClientConnection send(Frame frame, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public synchronized StompClientConnection send(Frame frame, Promise<Frame> receiptHandler) { | |
if (receiptHandler != null && frame.getCommand() != Command.PING) { | |
String receiptId = UUID.randomUUID().toString(); | |
frame.addHeader(Frame.RECEIPT, receiptId); | |
@@ -252,7 +252,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
} | |
public StompClientConnection send(String destination, Map<String, String> headers, Buffer body, | |
- Handler<AsyncResult<Frame>> receiptHandler) { | |
+ Promise<Frame> receiptHandler) { | |
// No need for synchronization, no field access, except client (final) | |
if (headers == null) { | |
headers = new Headers(); | |
@@ -282,7 +282,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public StompClientConnection subscribe(String destination, Handler<Frame> handler, Handler<AsyncResult<String>> receiptHandler) { | |
+ public StompClientConnection subscribe(String destination, Handler<Frame> handler, Promise<String> receiptHandler) { | |
return subscribe(destination, null, handler, receiptHandler); | |
} | |
@@ -293,7 +293,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public synchronized StompClientConnection subscribe(String destination, Map<String, String> headers, Handler<Frame> handler, Handler<AsyncResult<String>> receiptHandler) { | |
+ public synchronized StompClientConnection subscribe(String destination, Map<String, String> headers, Handler<Frame> handler, Promise<String> receiptHandler) { | |
Objects.requireNonNull(destination); | |
Objects.requireNonNull(handler); | |
@@ -319,7 +319,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
} | |
Frame frame = new Frame(Command.SUBSCRIBE, headers, null); | |
- send(frame, ar -> { | |
+ Future.<Frame>future(p -> send(frame, p)).onComplete(ar -> { | |
if (receiptHandler != null) { | |
receiptHandler.handle(ar.map(id)); | |
} | |
@@ -342,7 +342,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public synchronized StompClientConnection unsubscribe(String destination, Map<String, String> headers, Handler<AsyncResult<Frame>> | |
+ public synchronized StompClientConnection unsubscribe(String destination, Map<String, String> headers, Promise<Frame> | |
receiptHandler) { | |
Objects.requireNonNull(destination); | |
if (headers == null) { | |
@@ -382,7 +382,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return this; | |
} | |
- public StompClientConnection beginTX(String id, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection beginTX(String id, Promise<Frame> receiptHandler) { | |
return beginTX(id, new Headers(), receiptHandler); | |
} | |
@@ -398,7 +398,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public StompClientConnection beginTX(String id, Map<String, String> headers, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection beginTX(String id, Map<String, String> headers, Promise<Frame> receiptHandler) { | |
Objects.requireNonNull(id); | |
Objects.requireNonNull(headers); | |
@@ -410,7 +410,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return commit(id, new Headers()); | |
} | |
- public StompClientConnection commit(String id, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection commit(String id, Promise<Frame> receiptHandler) { | |
return commit(id, new Headers(), receiptHandler); | |
} | |
@@ -421,7 +421,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public StompClientConnection commit(String id, Map<String, String> headers, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection commit(String id, Map<String, String> headers, Promise<Frame> receiptHandler) { | |
Objects.requireNonNull(id); | |
Objects.requireNonNull(headers); | |
return send(new Frame().setCommand(Command.COMMIT).setTransaction(id), receiptHandler); | |
@@ -432,7 +432,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return abort(id, new Headers()); | |
} | |
- public StompClientConnection abort(String id, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection abort(String id, Promise<Frame> receiptHandler) { | |
return abort(id, new Headers(), receiptHandler); | |
} | |
@@ -443,7 +443,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public StompClientConnection abort(String id, Map<String, String> headers, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection abort(String id, Map<String, String> headers, Promise<Frame> receiptHandler) { | |
Objects.requireNonNull(id); | |
Objects.requireNonNull(headers); | |
return send(new Frame().setCommand(Command.ABORT).setTransaction(id), receiptHandler); | |
@@ -463,16 +463,17 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public StompClientConnection disconnect(Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection disconnect(Promise<Frame> receiptHandler) { | |
return disconnect(new Frame().setCommand(Command.DISCONNECT), receiptHandler); | |
} | |
- public StompClientConnection disconnect(Frame frame, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection disconnect(Frame frame, Promise<Frame> receiptHandler) { | |
Objects.requireNonNull(frame); | |
synchronized (this) { | |
if (status == Status.CONNECTED) { | |
status = Status.CLOSING; | |
- send(frame, f -> { | |
+ Future.<Frame>future(p -> send(frame, p)) | |
+ .onComplete(f -> { | |
if (receiptHandler != null) { | |
receiptHandler.handle(f); | |
} | |
@@ -493,7 +494,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public StompClientConnection ack(String id, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection ack(String id, Promise<Frame> receiptHandler) { | |
Objects.requireNonNull(id); | |
send(new Frame(Command.ACK, Headers.create(Frame.ID, id), null), receiptHandler); | |
return this; | |
@@ -505,7 +506,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public StompClientConnection nack(String id, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection nack(String id, Promise<Frame> receiptHandler) { | |
Objects.requireNonNull(id); | |
send(new Frame(Command.NACK, Headers.create(Frame.ID, id), null), receiptHandler); | |
return this; | |
@@ -518,7 +519,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public StompClientConnection ack(String id, String txId, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection ack(String id, String txId, Promise<Frame> receiptHandler) { | |
Objects.requireNonNull(id, "A ACK frame must contain the ACK id"); | |
Objects.requireNonNull(txId); | |
@@ -534,7 +535,7 @@ public class StompClientConnectionImpl implements StompClientConnection, Handler | |
return promise.future(); | |
} | |
- public StompClientConnection nack(String id, String txId, Handler<AsyncResult<Frame>> receiptHandler) { | |
+ public StompClientConnection nack(String id, String txId, Promise<Frame> receiptHandler) { | |
Objects.requireNonNull(id, "A NACK frame must contain the ACK id"); | |
Objects.requireNonNull(txId); | |
diff --git a/src/main/java/io/vertx/ext/stomp/impl/StompClientImpl.java b/src/main/java/io/vertx/ext/stomp/impl/StompClientImpl.java | |
index 7df05c4..9511bdf 100644 | |
--- a/src/main/java/io/vertx/ext/stomp/impl/StompClientImpl.java | |
+++ b/src/main/java/io/vertx/ext/stomp/impl/StompClientImpl.java | |
@@ -55,7 +55,7 @@ public class StompClientImpl implements StompClient { | |
this.client = (NetClientInternal) vertx.createNetClient(options); | |
} | |
- public StompClient connect(Handler<AsyncResult<StompClientConnection>> resultHandler) { | |
+ public StompClient connect(Promise<StompClientConnection> resultHandler) { | |
return connect(options.getPort(), options.getHost(), resultHandler); | |
} | |
@@ -142,7 +142,7 @@ public class StompClientImpl implements StompClient { | |
return client == null; | |
} | |
- public synchronized StompClient connect(int port, String host, Handler<AsyncResult<StompClientConnection>> resultHandler) { | |
+ public synchronized StompClient connect(int port, String host, Promise<StompClientConnection> resultHandler) { | |
Handler<Frame> r = receivedFrameHandler; | |
Handler<Frame> w = writingFrameHandler; | |
diff --git a/src/main/java/io/vertx/ext/stomp/impl/StompServerImpl.java b/src/main/java/io/vertx/ext/stomp/impl/StompServerImpl.java | |
index 86044a8..020573b 100644 | |
--- a/src/main/java/io/vertx/ext/stomp/impl/StompServerImpl.java | |
+++ b/src/main/java/io/vertx/ext/stomp/impl/StompServerImpl.java | |
@@ -81,7 +81,7 @@ public class StompServerImpl implements StompServer { | |
return promise.future(); | |
} | |
- public StompServer listen(Handler<AsyncResult<StompServer>> handler) { | |
+ public StompServer listen(Promise<StompServer> handler) { | |
return listen(options.getPort(), options.getHost(), handler); | |
} | |
@@ -97,7 +97,7 @@ public class StompServerImpl implements StompServer { | |
return promise.future(); | |
} | |
- public StompServer listen(int port, String host, Handler<AsyncResult<StompServer>> handler) { | |
+ public StompServer listen(int port, String host, Promise<StompServer> handler) { | |
if (port == -1) { | |
handler.handle(Future.failedFuture("TCP server disabled. The port is set to '-1'.")); | |
return this; | |
@@ -202,7 +202,7 @@ public class StompServerImpl implements StompServer { | |
return handler; | |
} | |
- public void close(Handler<AsyncResult<Void>> done) { | |
+ public void close(Promise<Void> done) { | |
if (!listening) { | |
if (done != null) { | |
vertx.runOnContext((v) -> done.handle(Future.succeededFuture())); | |
diff --git a/src/test/java/io/vertx/ext/stomp/impl/ReceiptTest.java b/src/test/java/io/vertx/ext/stomp/impl/ReceiptTest.java | |
index 1eb26f7..c15d6c1 100644 | |
--- a/src/test/java/io/vertx/ext/stomp/impl/ReceiptTest.java | |
+++ b/src/test/java/io/vertx/ext/stomp/impl/ReceiptTest.java | |
@@ -83,7 +83,7 @@ public class ReceiptTest { | |
List<AsyncResult<?>> receipts = new CopyOnWriteArrayList<>(); | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
- connection.subscribe("/queue", frames::add).onComplete(receipts::add); | |
+ connection.subscribe("/queue", frames::add).onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> Helper.hasDestination(server.stompHandler().getDestinations(), "/queue")); | |
@@ -93,7 +93,7 @@ public class ReceiptTest { | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
- connection.send("/queue", Buffer.buffer("Hello")).onComplete(receipts::add); | |
+ connection.send("/queue", Buffer.buffer("Hello")).onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> !frames.isEmpty()); | |
@@ -108,25 +108,25 @@ public class ReceiptTest { | |
AtomicReference<StompClientConnection> client = new AtomicReference<>(); | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
- connection.subscribe("/queue", frames::add).onComplete(receipts::add); | |
+ connection.subscribe("/queue", frames::add).onComplete(a -> receipts.add(a)); | |
})); | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
client.set(connection); | |
- connection.subscribe("/queue", frames::add).onComplete(receipts::add); | |
+ connection.subscribe("/queue", frames::add).onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> Helper.hasDestination(server.stompHandler().getDestinations(), "/queue")); | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
- connection.send("/queue", Buffer.buffer("Hello")).onComplete(receipts::add); | |
+ connection.send("/queue", Buffer.buffer("Hello")).onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> frames.size() >= 2); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 3); | |
- client.get().unsubscribe("/queue").onComplete(receipts::add); | |
+ client.get().unsubscribe("/queue").onComplete(a -> receipts.add(a)); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 4); | |
} | |
@@ -136,14 +136,14 @@ public class ReceiptTest { | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
connection.subscribe("/queue", Headers.create(Frame.ACK, "client"), | |
- frame -> connection.ack(frame.getAck())).onComplete(receipts::add); | |
+ frame -> connection.ack(frame.getAck())).onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> Helper.hasDestination(server.stompHandler().getDestinations(), "/queue")); | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
- connection.send("/queue", Buffer.buffer("Hello")).onComplete(receipts::add); | |
+ connection.send("/queue", Buffer.buffer("Hello")).onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 2); | |
@@ -155,14 +155,14 @@ public class ReceiptTest { | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
connection.subscribe("/queue", Headers.create(Frame.ACK, "client"), | |
- frame -> connection.nack(frame.getAck())).onComplete(receipts::add); | |
+ frame -> connection.nack(frame.getAck())).onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> Helper.hasDestination(server.stompHandler().getDestinations(), "/queue")); | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
- connection.send("/queue", Buffer.buffer("Hello")).onComplete(receipts::add); | |
+ connection.send("/queue", Buffer.buffer("Hello")).onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 2); | |
@@ -175,7 +175,7 @@ public class ReceiptTest { | |
List<Frame> errors = new CopyOnWriteArrayList<>(); | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
- connection.subscribe("/queue", frames::add).onComplete(receipts::add); | |
+ connection.subscribe("/queue", frames::add).onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 1); | |
@@ -183,14 +183,14 @@ public class ReceiptTest { | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
connection.errorHandler(errors::add); | |
- connection.beginTX("my-tx").onComplete(receipts::add); | |
+ connection.beginTX("my-tx").onComplete(a -> receipts.add(a)); | |
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx") | |
- .setBody(Buffer.buffer("Hello"))).onComplete(receipts::add); | |
+ .setBody(Buffer.buffer("Hello"))).onComplete(a -> receipts.add(a)); | |
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody( | |
- Buffer.buffer("World"))).onComplete(receipts::add); | |
+ Buffer.buffer("World"))).onComplete(a -> receipts.add(a)); | |
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx") | |
- .setBody(Buffer.buffer("!!!"))).onComplete(receipts::add); | |
- connection.commit("my-tx").onComplete(receipts::add); | |
+ .setBody(Buffer.buffer("!!!"))).onComplete(a -> receipts.add(a)); | |
+ connection.commit("my-tx").onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> frames.size() == 3 && errors.isEmpty() | |
@@ -205,7 +205,7 @@ public class ReceiptTest { | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
- connection.subscribe("/queue", frames::add).onComplete(receipts::add); | |
+ connection.subscribe("/queue", frames::add).onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> receipts.size() == 1); | |
@@ -213,14 +213,14 @@ public class ReceiptTest { | |
client((ar -> { | |
final StompClientConnection connection = ar.result(); | |
connection.errorHandler(errors::add); | |
- connection.beginTX("my-tx").onComplete(receipts::add); | |
+ connection.beginTX("my-tx").onComplete(a -> receipts.add(a)); | |
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx") | |
- .setBody(Buffer.buffer("Hello"))).onComplete(receipts::add); | |
+ .setBody(Buffer.buffer("Hello"))).onComplete(a -> receipts.add(a)); | |
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody( | |
- Buffer.buffer("World"))).onComplete(receipts::add); | |
+ Buffer.buffer("World"))).onComplete(a -> receipts.add(a)); | |
connection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx") | |
- .setBody(Buffer.buffer("!!!"))).onComplete(receipts::add); | |
- connection.abort("my-tx").onComplete(receipts::add); | |
+ .setBody(Buffer.buffer("!!!"))).onComplete(a -> receipts.add(a)); | |
+ connection.abort("my-tx").onComplete(a -> receipts.add(a)); | |
})); | |
Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> frames.size() == 0 && errors.isEmpty() | |
diff --git a/src/main/java/io/vertx/ext/mail/impl/SMTPConnection.java b/src/main/java/io/vertx/ext/mail/impl/SMTPConnection.java | |
index 1a43043..039a9e0 100644 | |
--- a/src/main/java/io/vertx/ext/mail/impl/SMTPConnection.java | |
+++ b/src/main/java/io/vertx/ext/mail/impl/SMTPConnection.java | |
@@ -58,9 +58,9 @@ public class SMTPConnection { | |
private boolean inuse; | |
private boolean quitSent; | |
- private Handler<AsyncResult<String>> commandReplyHandler; | |
+ private Promise<String> commandReplyHandler; | |
private Handler<Throwable> exceptionHandler; | |
- private Handler<AsyncResult<Void>> closeHandler; | |
+ private Promise<Void> closeHandler; | |
private Capabilities capa = new Capabilities(); | |
private final ContextInternal context; | |
private long expirationTimestamp; | |
@@ -104,7 +104,7 @@ public class SMTPConnection { | |
log.error("dropping reply arriving after we stopped processing the buffer."); | |
} else { | |
// make sure we only call the handler once | |
- Handler<AsyncResult<String>> currentHandler = commandReplyHandler; | |
+ Promise<String> currentHandler = commandReplyHandler; | |
commandReplyHandler = null; | |
if (currentHandler != null) { | |
currentHandler.handle(Future.succeededFuture(buffer.toString())); | |
@@ -265,7 +265,7 @@ public class SMTPConnection { | |
private void handleError(Throwable t) { | |
context.emit(roc -> { | |
- Handler<AsyncResult<String>> currentHandler = commandReplyHandler; | |
+ Promise<String> currentHandler = commandReplyHandler; | |
if (currentHandler != null) { | |
commandReplyHandler = null; | |
currentHandler.handle(Future.failedFuture(t)); | |
diff --git a/vertx-grpc/src/main/java/io/vertx/grpc/VertxServer.java b/vertx-grpc/src/main/java/io/vertx/grpc/VertxServer.java | |
index 19e4573..356369e 100644 | |
--- a/vertx-grpc/src/main/java/io/vertx/grpc/VertxServer.java | |
+++ b/vertx-grpc/src/main/java/io/vertx/grpc/VertxServer.java | |
@@ -171,23 +171,25 @@ public class VertxServer extends Server { | |
@Override | |
public VertxServer start() throws IOException { | |
- return start(ar -> {}); | |
+ start(); | |
+ return this; | |
} | |
- public VertxServer start(Handler<AsyncResult<Void>> completionHandler) { | |
+ public Future<Void> start2() { | |
if (id.port > 0) { | |
actual = map.computeIfAbsent(id, id -> new ActualServer(context.owner(), id, options, builder, commandDecorator)); | |
} else { | |
actual = new ActualServer(context.owner(), id, options, builder, commandDecorator); | |
} | |
+ Promise<Void> p = Promise.promise(); | |
actual.start(context, ar1 -> { | |
if (ar1.succeeded()) { | |
hook = this::shutdown; | |
context.addCloseHook(hook); | |
} | |
- completionHandler.handle(ar1); | |
+ p.handle(ar1); | |
}); | |
- return this; | |
+ return p.future(); | |
} | |
@Override | |
diff --git a/vertx-grpc/src/test/java/io/vertx/ext/grpc/CommandDecoratorTest.java b/vertx-grpc/src/test/java/io/vertx/ext/grpc/CommandDecoratorTest.java | |
index 2b76932..098e9e7 100644 | |
--- a/vertx-grpc/src/test/java/io/vertx/ext/grpc/CommandDecoratorTest.java | |
+++ b/vertx-grpc/src/test/java/io/vertx/ext/grpc/CommandDecoratorTest.java | |
@@ -38,7 +38,7 @@ public class CommandDecoratorTest extends GrpcTestBase { | |
} | |
}).build(); | |
- server.start(ar -> { | |
+ server.start2().onComplete(ar -> { | |
if (ar.succeeded()) { | |
if (server.getRawServer() == null) { | |
should.fail("The underlying server not exposed (server.getRawServer())"); | |
diff --git a/vertx-grpc/src/test/java/io/vertx/ext/grpc/GrpcTestBase.java b/vertx-grpc/src/test/java/io/vertx/ext/grpc/GrpcTestBase.java | |
index 2222711..b7588b2 100644 | |
--- a/vertx-grpc/src/test/java/io/vertx/ext/grpc/GrpcTestBase.java | |
+++ b/vertx-grpc/src/test/java/io/vertx/ext/grpc/GrpcTestBase.java | |
@@ -61,13 +61,10 @@ public abstract class GrpcTestBase { | |
} | |
Future<Void> startServer(BindableService service, VertxServerBuilder builder) { | |
- Promise<Void> promise = Promise.promise(); | |
server = builder | |
.addService(service) | |
- .build() | |
- .start(promise); | |
- | |
- return promise.future(); | |
+ .build(); | |
+ return server.start2(); | |
} | |
Future<Void> startServer(ServerServiceDefinition service) { | |
@@ -89,7 +86,8 @@ public abstract class GrpcTestBase { | |
void startServer(ServerServiceDefinition service, VertxServerBuilder builder, Handler<AsyncResult<Void>> completionHandler) { | |
server = builder | |
.addService(service) | |
- .build() | |
- .start(completionHandler); | |
+ .build(); | |
+ server.start2() | |
+ .onComplete(completionHandler); | |
} | |
} | |
diff --git a/vertx-grpc/src/test/java/io/vertx/ext/grpc/NativeTransportTest.java b/vertx-grpc/src/test/java/io/vertx/ext/grpc/NativeTransportTest.java | |
index ab93434..0ad3650 100644 | |
--- a/vertx-grpc/src/test/java/io/vertx/ext/grpc/NativeTransportTest.java | |
+++ b/vertx-grpc/src/test/java/io/vertx/ext/grpc/NativeTransportTest.java | |
@@ -31,7 +31,8 @@ public class NativeTransportTest { | |
VertxServerBuilder.forPort(vertx, 0) | |
.addService(new GreeterGrpc.GreeterImplBase() { }) | |
.build() | |
- .start(ctx.asyncAssertSuccess()); | |
+ .start2() | |
+ .onComplete(ctx.asyncAssertSuccess()); | |
} | |
private void assumeNativeTransport() { | |
diff --git a/vertx-grpc/src/test/java/io/vertx/ext/grpc/RpcTest.java b/vertx-grpc/src/test/java/io/vertx/ext/grpc/RpcTest.java | |
index 007eace..95e1c53 100644 | |
--- a/vertx-grpc/src/test/java/io/vertx/ext/grpc/RpcTest.java | |
+++ b/vertx-grpc/src/test/java/io/vertx/ext/grpc/RpcTest.java | |
@@ -139,8 +139,9 @@ public class RpcTest extends GrpcTestBase { | |
}; | |
server = VertxServerBuilder.forPort(vertx, port) | |
.addService(ServerInterceptors.intercept(service, BlockingServerInterceptor.wrap(vertx, blockingInterceptor))) | |
- .build() | |
- .start(ar -> { | |
+ .build(); | |
+ server | |
+ .start2().onComplete(ar -> { | |
if (ar.failed()) { | |
should.fail(ar.cause()); | |
return; | |
diff --git a/vertx-grpc/src/test/java/io/vertx/ext/grpc/VerticleTest.java b/vertx-grpc/src/test/java/io/vertx/ext/grpc/VerticleTest.java | |
index 6c80f77..4753f0b 100644 | |
--- a/vertx-grpc/src/test/java/io/vertx/ext/grpc/VerticleTest.java | |
+++ b/vertx-grpc/src/test/java/io/vertx/ext/grpc/VerticleTest.java | |
@@ -64,7 +64,7 @@ public class VerticleTest { | |
} | |
}; | |
server = VertxServerBuilder.forPort(vertx, port).addService(service).build(); | |
- server.start(startFuture); | |
+ server.start2().onComplete(startFuture); | |
} | |
@Override | |
diff --git a/src/main/java/io/vertx/amqp/impl/AmqpClientImpl.java b/src/main/java/io/vertx/amqp/impl/AmqpClientImpl.java | |
index 07fccca..470e7e7 100644 | |
--- a/src/main/java/io/vertx/amqp/impl/AmqpClientImpl.java | |
+++ b/src/main/java/io/vertx/amqp/impl/AmqpClientImpl.java | |
@@ -68,7 +68,7 @@ public class AmqpClientImpl implements AmqpClient { | |
return future; | |
} | |
- public void close(Handler<AsyncResult<Void>> handler) { | |
+ public void close(Promise<Void> handler) { | |
List<Future<Void>> actions = new ArrayList<>(); | |
for (AmqpConnection connection : connections) { | |
actions.add(connection.close()); | |
@@ -104,7 +104,7 @@ public class AmqpClientImpl implements AmqpClient { | |
} | |
public AmqpClient createReceiver(String address, | |
- Handler<AsyncResult<AmqpReceiver>> completionHandler) { | |
+ Promise<AmqpReceiver> completionHandler) { | |
return connect(res -> { | |
if (res.failed()) { | |
completionHandler.handle(res.mapEmpty()); | |
@@ -121,7 +121,7 @@ public class AmqpClientImpl implements AmqpClient { | |
return promise.future(); | |
} | |
- public AmqpClient createReceiver(String address, AmqpReceiverOptions receiverOptions, Handler<AsyncResult<AmqpReceiver>> completionHandler) { | |
+ public AmqpClient createReceiver(String address, AmqpReceiverOptions receiverOptions, Promise<AmqpReceiver> completionHandler) { | |
return connect(res -> { | |
if (res.failed()) { | |
completionHandler.handle(res.mapEmpty()); | |
@@ -138,7 +138,7 @@ public class AmqpClientImpl implements AmqpClient { | |
return promise.future(); | |
} | |
- public AmqpClient createSender(String address, Handler<AsyncResult<AmqpSender>> completionHandler) { | |
+ public AmqpClient createSender(String address, Promise<AmqpSender> completionHandler) { | |
return connect(res -> { | |
if (res.failed()) { | |
completionHandler.handle(res.mapEmpty()); | |
@@ -156,7 +156,7 @@ public class AmqpClientImpl implements AmqpClient { | |
} | |
public AmqpClient createSender(String address, AmqpSenderOptions options, | |
- Handler<AsyncResult<AmqpSender>> completionHandler) { | |
+ Promise<AmqpSender> completionHandler) { | |
return connect(res -> { | |
if (res.failed()) { | |
completionHandler.handle(res.mapEmpty()); | |
diff --git a/src/main/java/io/vertx/amqp/impl/AmqpConnectionImpl.java b/src/main/java/io/vertx/amqp/impl/AmqpConnectionImpl.java | |
index 139d943..6d2afbf 100644 | |
--- a/src/main/java/io/vertx/amqp/impl/AmqpConnectionImpl.java | |
+++ b/src/main/java/io/vertx/amqp/impl/AmqpConnectionImpl.java | |
@@ -210,7 +210,7 @@ public class AmqpConnectionImpl implements AmqpConnection { | |
return this; | |
} | |
- public AmqpConnection close(Handler<AsyncResult<Void>> done) { | |
+ public AmqpConnection close(Promise<Void> done) { | |
context.runOnContext(ignored -> { | |
ProtonConnection actualConnection = connection.get(); | |
if (actualConnection == null || closed.get() || (!isLocalOpen() && !isRemoteOpen())) { | |
@@ -268,7 +268,7 @@ public class AmqpConnectionImpl implements AmqpConnection { | |
receivers.remove(receiver); | |
} | |
- public AmqpConnection createDynamicReceiver(Handler<AsyncResult<AmqpReceiver>> completionHandler) { | |
+ public AmqpConnection createDynamicReceiver(Promise<AmqpReceiver> completionHandler) { | |
return createReceiver(null, new AmqpReceiverOptions().setDynamic(true), completionHandler); | |
} | |
@@ -279,7 +279,7 @@ public class AmqpConnectionImpl implements AmqpConnection { | |
return promise.future(); | |
} | |
- public AmqpConnection createReceiver(String address, Handler<AsyncResult<AmqpReceiver>> completionHandler) { | |
+ public AmqpConnection createReceiver(String address, Promise<AmqpReceiver> completionHandler) { | |
ProtonLinkOptions opts = new ProtonLinkOptions(); | |
runWithTrampoline(x -> { | |
@@ -305,7 +305,7 @@ public class AmqpConnectionImpl implements AmqpConnection { | |
} | |
public AmqpConnection createReceiver(String address, AmqpReceiverOptions receiverOptions, | |
- Handler<AsyncResult<AmqpReceiver>> completionHandler) { | |
+ Promise<AmqpReceiver> completionHandler) { | |
ProtonLinkOptions opts = new ProtonLinkOptions(); | |
AmqpReceiverOptions recOpts = receiverOptions == null ? new AmqpReceiverOptions() : receiverOptions; | |
opts | |
@@ -370,7 +370,7 @@ public class AmqpConnectionImpl implements AmqpConnection { | |
} | |
} | |
- public AmqpConnection createSender(String address, Handler<AsyncResult<AmqpSender>> completionHandler) { | |
+ public AmqpConnection createSender(String address, Promise<AmqpSender> completionHandler) { | |
Objects.requireNonNull(address, "The address must be set"); | |
return createSender(address, new AmqpSenderOptions(), completionHandler); | |
} | |
@@ -383,7 +383,7 @@ public class AmqpConnectionImpl implements AmqpConnection { | |
} | |
public AmqpConnection createSender(String address, AmqpSenderOptions options, | |
- Handler<AsyncResult<AmqpSender>> completionHandler) { | |
+ Promise<AmqpSender> completionHandler) { | |
if (address == null && !options.isDynamic()) { | |
throw new IllegalArgumentException("Address must be set if the link is not dynamic"); | |
} | |
@@ -431,7 +431,7 @@ public class AmqpConnectionImpl implements AmqpConnection { | |
return promise.future(); | |
} | |
- public AmqpConnection createAnonymousSender(Handler<AsyncResult<AmqpSender>> completionHandler) { | |
+ public AmqpConnection createAnonymousSender(Promise<AmqpSender> completionHandler) { | |
Objects.requireNonNull(completionHandler, "The completion handler must be set"); | |
runWithTrampoline(x -> { | |
ProtonConnection conn = connection.get(); | |
diff --git a/src/main/java/io/vertx/amqp/impl/AmqpReceiverImpl.java b/src/main/java/io/vertx/amqp/impl/AmqpReceiverImpl.java | |
index 8d2f559..40faa62 100644 | |
--- a/src/main/java/io/vertx/amqp/impl/AmqpReceiverImpl.java | |
+++ b/src/main/java/io/vertx/amqp/impl/AmqpReceiverImpl.java | |
@@ -68,7 +68,7 @@ public class AmqpReceiverImpl implements AmqpReceiver { | |
AmqpConnectionImpl connection, | |
AmqpReceiverOptions options, | |
ProtonReceiver receiver, | |
- Handler<AsyncResult<AmqpReceiver>> completionHandler) { | |
+ Promise<AmqpReceiver> completionHandler) { | |
this.address = address; | |
this.receiver = receiver; | |
this.connection = connection; | |
@@ -310,10 +310,10 @@ public class AmqpReceiverImpl implements AmqpReceiver { | |
return connection; | |
} | |
- public void close(Handler<AsyncResult<Void>> handler) { | |
- Handler<AsyncResult<Void>> actualHandler; | |
+ public void close(Promise<Void> handler) { | |
+ Promise<Void> actualHandler; | |
if (handler == null) { | |
- actualHandler = x -> { /* NOOP */ }; | |
+ actualHandler = Promise.promise(); | |
} else { | |
actualHandler = handler; | |
} | |
diff --git a/src/main/java/io/vertx/amqp/impl/AmqpSenderImpl.java b/src/main/java/io/vertx/amqp/impl/AmqpSenderImpl.java | |
index 9d32620..f3d2c9e 100644 | |
--- a/src/main/java/io/vertx/amqp/impl/AmqpSenderImpl.java | |
+++ b/src/main/java/io/vertx/amqp/impl/AmqpSenderImpl.java | |
@@ -40,7 +40,7 @@ public class AmqpSenderImpl implements AmqpSender { | |
private long remoteCredit = 0; | |
private AmqpSenderImpl(ProtonSender sender, AmqpConnectionImpl connection, | |
- Handler<AsyncResult<AmqpSender>> completionHandler) { | |
+ Promise<AmqpSender> completionHandler) { | |
this.sender = sender; | |
this.connection = connection; | |
@@ -86,7 +86,7 @@ public class AmqpSenderImpl implements AmqpSender { | |
* @param completionHandler the completion handler | |
*/ | |
static void create(ProtonSender sender, AmqpConnectionImpl connection, | |
- Handler<AsyncResult<AmqpSender>> completionHandler) { | |
+ Promise<AmqpSender> completionHandler) { | |
new AmqpSenderImpl(sender, connection, completionHandler); | |
} | |
@@ -137,17 +137,13 @@ public class AmqpSenderImpl implements AmqpSender { | |
return doSend(message, null); | |
} | |
- private AmqpSender doSend(AmqpMessage message, Handler<AsyncResult<Void>> acknowledgmentHandler) { | |
+ private AmqpSender doSend(AmqpMessage message, Promise<Void> acknowledgmentHandler) { | |
Handler<ProtonDelivery> ack = delivery -> { | |
DeliveryState remoteState = delivery.getRemoteState(); | |
- Handler<AsyncResult<Void>> handler = acknowledgmentHandler; | |
+ Promise<Void> handler = acknowledgmentHandler; | |
if (acknowledgmentHandler == null) { | |
- handler = ar -> { | |
- if (ar.failed()) { | |
- LOGGER.warn("Message rejected by remote peer", ar.cause()); | |
- } | |
- }; | |
+ handler = Promise.promise(); | |
} | |
if (remoteState == null) { | |
@@ -233,7 +229,7 @@ public class AmqpSenderImpl implements AmqpSender { | |
return this; | |
} | |
- public AmqpSender sendWithAck(AmqpMessage message, Handler<AsyncResult<Void>> acknowledgementHandler) { | |
+ public AmqpSender sendWithAck(AmqpMessage message, Promise<Void> acknowledgementHandler) { | |
return doSend(message, acknowledgementHandler); | |
} | |
@@ -244,10 +240,10 @@ public class AmqpSenderImpl implements AmqpSender { | |
return promise.future(); | |
} | |
- public void close(Handler<AsyncResult<Void>> handler) { | |
- Handler<AsyncResult<Void>> actualHandler; | |
+ public void close(Promise<Void> handler) { | |
+ Promise<Void> actualHandler; | |
if (handler == null) { | |
- actualHandler = x -> { /* NOOP */ }; | |
+ actualHandler = Promise.promise(); | |
} else { | |
actualHandler = handler; | |
} | |
diff --git a/src/main/java/io/vertx/kafka/client/common/impl/CloseHandler.java b/src/main/java/io/vertx/kafka/client/common/impl/CloseHandler.java | |
index 7f5b89d..ebe2472 100644 | |
--- a/src/main/java/io/vertx/kafka/client/common/impl/CloseHandler.java | |
+++ b/src/main/java/io/vertx/kafka/client/common/impl/CloseHandler.java | |
@@ -15,10 +15,7 @@ | |
*/ | |
package io.vertx.kafka.client.common.impl; | |
-import io.vertx.core.AsyncResult; | |
-import io.vertx.core.Closeable; | |
-import io.vertx.core.Future; | |
-import io.vertx.core.Handler; | |
+import io.vertx.core.*; | |
import io.vertx.core.internal.ContextInternal; | |
import io.vertx.core.internal.VertxInternal; | |
@@ -34,9 +31,9 @@ public class CloseHandler { | |
private Closeable closeable; | |
private Runnable closeableHookCleanup; | |
- private final BiConsumer<Long, Handler<AsyncResult<Void>>> close; | |
+ private final BiConsumer<Long, Promise<Void>> close; | |
- public CloseHandler(BiConsumer<Long, Handler<AsyncResult<Void>>> close) { | |
+ public CloseHandler(BiConsumer<Long, Promise<Void>> close) { | |
this.close = close; | |
} | |
@@ -80,15 +77,15 @@ public class CloseHandler { | |
public void close() { | |
unregisterCloseHook(); | |
- close.accept(0L, ar -> {}); | |
+ close.accept(0L, Promise.promise()); | |
} | |
- public void close(Handler<AsyncResult<Void>> completionHandler) { | |
+ public void close(Promise<Void> completionHandler) { | |
unregisterCloseHook(); | |
close.accept(0L, completionHandler); | |
} | |
- public void close(long timeout, Handler<AsyncResult<Void>> completionHandler) { | |
+ public void close(long timeout, Promise<Void> completionHandler) { | |
unregisterCloseHook(); | |
close.accept(timeout, completionHandler); | |
} | |
diff --git a/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java b/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java | |
index 0cc9b98..1386792 100644 | |
--- a/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java | |
+++ b/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java | |
@@ -111,12 +111,12 @@ public class KafkaReadStreamImpl<K, V> implements KafkaReadStream<K, V> { | |
this.tracer = ConsumerTracer.create(ctxInt.tracer(), options); | |
} | |
- private <T> void start(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Handler<AsyncResult<T>> handler) { | |
+ private <T> void start(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Promise<T> handler) { | |
this.worker = Executors.newSingleThreadExecutor(r -> new Thread(r, "vert.x-kafka-consumer-thread-" + threadCount.getAndIncrement())); | |
this.submitTaskWhenStarted(task, handler); | |
} | |
- private <T> void submitTaskWhenStarted(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Handler<AsyncResult<T>> handler) { | |
+ private <T> void submitTaskWhenStarted(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Promise<T> handler) { | |
if (worker == null) { | |
throw new IllegalStateException(); | |
} | |
@@ -264,7 +264,7 @@ public class KafkaReadStreamImpl<K, V> implements KafkaReadStream<K, V> { | |
} | |
protected <T> void submitTask(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, | |
- Handler<AsyncResult<T>> handler) { | |
+ Promise<T> handler) { | |
if (this.closed.compareAndSet(true, false)) { | |
this.start(task, handler); | |
} else { | |
diff --git a/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java b/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java | |
index 50b4fff..5f1fafe 100644 | |
--- a/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java | |
+++ b/src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java | |
@@ -203,7 +203,8 @@ public class RabbitMQClientImpl implements RabbitMQClient, ShutdownListener { | |
if (handler.queue().isCancelled()) { | |
return; | |
} | |
- restartConnect(0, rh -> { | |
+ Future.<Void>future(p -> restartConnect(0, p)) | |
+ .onComplete(rh -> { | |
forChannel(chan -> { | |
RabbitMQConsumer q = handler.queue(); | |
chan.basicConsume(q.queueName(), options.isAutoAck(), options.getConsumerTag(), | |
@@ -234,7 +235,7 @@ public class RabbitMQClientImpl implements RabbitMQClient, ShutdownListener { | |
* @param attempts number of attempts | |
* @param resultHandler handler called when operation is done with a result of the operation | |
*/ | |
- private void restartConnect(int attempts, Handler<AsyncResult<Void>> resultHandler) { | |
+ private void restartConnect(int attempts, Promise<Void> resultHandler) { | |
if (retries == 0) { | |
log.error("Retries disabled. Will not attempt to restart"); | |
resultHandler.handle(Future.failedFuture("Retries disabled. Will not attempt to restart")); | |
@@ -261,7 +262,7 @@ public class RabbitMQClientImpl implements RabbitMQClient, ShutdownListener { | |
} | |
- private void execRestart(int attempts, Handler<AsyncResult<Void>> resultHandler) { | |
+ private void execRestart(int attempts, Promise<Void> resultHandler) { | |
stop().onComplete(ar -> { | |
if (ar.succeeded()) { | |
if (attempts >= retries) { | |
@@ -659,7 +660,7 @@ public class RabbitMQClientImpl implements RabbitMQClient, ShutdownListener { | |
} | |
log.info("RabbitMQ connection shutdown! The client will attempt to reconnect automatically", cause); | |
//Make sure to perform reconnection | |
- restartConnect(0, rh -> { | |
+ Future.<Void>future(p -> restartConnect(0, p)).onComplete(rh -> { | |
log.info("reconnect success"); | |
}); | |
} | |
diff --git a/src/main/java/io/vertx/rabbitmq/impl/RabbitMQPublisherImpl.java b/src/main/java/io/vertx/rabbitmq/impl/RabbitMQPublisherImpl.java | |
index 5f83683..85a05f4 100644 | |
--- a/src/main/java/io/vertx/rabbitmq/impl/RabbitMQPublisherImpl.java | |
+++ b/src/main/java/io/vertx/rabbitmq/impl/RabbitMQPublisherImpl.java | |
@@ -63,13 +63,13 @@ public class RabbitMQPublisherImpl implements RabbitMQPublisher, ReadStream<Rabb | |
private final String routingKey; | |
private final BasicProperties properties; | |
private final Buffer message; | |
- private final Handler<AsyncResult<Void>> publishHandler; | |
- private final Handler<AsyncResult<Long>> confirmHandler; | |
+ private final Promise<Void> publishHandler; | |
+ private final Promise<Long> confirmHandler; | |
private volatile long deliveryTag; | |
MessageDetails(String exchange, String routingKey, BasicProperties properties, Buffer message, | |
- Handler<AsyncResult<Void>> publishHandler, | |
- Handler<AsyncResult<Long>> confirmHandler) { | |
+ Promise<Void> publishHandler, | |
+ Promise<Long> confirmHandler) { | |
this.exchange = exchange; | |
this.routingKey = routingKey; | |
this.properties = properties; | |
@@ -113,7 +113,7 @@ public class RabbitMQPublisherImpl implements RabbitMQPublisher, ReadStream<Rabb | |
return promise.future(); | |
} | |
- private void stop(Handler<AsyncResult<Void>> resultHandler) { | |
+ private void stop(Promise<Void> resultHandler) { | |
stopped = true; | |
sendQueue.pause(); | |
if (sendQueue.isEmpty()) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment