Skip to content

Instantly share code, notes, and snippets.

@spmallette
Last active October 17, 2022 16:57
Show Gist options
  • Save spmallette/e9e2839a21613daceb7dd4a41f22c330 to your computer and use it in GitHub Desktop.
Save spmallette/e9e2839a21613daceb7dd4a41f22c330 to your computer and use it in GitHub Desktop.
driver refactoring

Clearly Identify a Server-side Close

Added "Server closed the channel for Connection…​scheduling removal from connection pool" log message which helps make it clear that the server initiated the close of a Connection and not the Client. The server might initiate that close for a variety of reasons, like a clean server shutdown or if the server side idleConnectionTimeout is exceeded.

Logs for a the server-side close now appear as:

[ERROR] org.apache.tinkerpop.gremlin.driver.Connection - Server closed the Connection on channel d852f267 - scheduling removal from ConnectionPool (Host{address=localhost/127.0.0.1:45940, hostUri=ws://localhost:45940/gremlin})
Connection Pool Status (size=4 max=4 min=4)
> Connection{channel=99d1d324, isDead=false, borrowed=0, pending=0}
==> Connection{channel=d852f267, isDead=true, borrowed=0, pending=0}
> Connection{channel=4449ca7d, isDead=false, borrowed=0, pending=0}
> Connection{channel=26d9972e, isDead=true, borrowed=0, pending=0}

Thread Pool Refactoring

Replaced two single-threaded executors(one generally used for scheduling retries and another used in ConnectionPool initialization) and sporadic use of the fork-join pool with two separate thread pools one generally for use by the Host instances and one used generally for Connection management. Each is sized a bit more aggressively ensure the driver has the resources it needs to initialize and recover from failures.

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.server;
import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizerIntegrateTest;
import org.apache.tinkerpop.gremlin.server.op.OpLoader;
import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assume.assumeThat;
/**
* Starts and stops an instance for each executed test.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public abstract class AbstractGremlinServerIntegrationTest {
public static final String KEY_PASS = "changeit";
public static final String JKS_SERVER_KEY = "src/test/resources/server-key.jks";
public static final String JKS_SERVER_TRUST = "src/test/resources/server-trust.jks";
public static final String JKS_CLIENT_KEY = "src/test/resources/client-key.jks";
public static final String JKS_CLIENT_TRUST = "src/test/resources/client-trust.jks";
public static final String P12_SERVER_KEY = "src/test/resources/server-key.p12";
public static final String P12_SERVER_TRUST = "src/test/resources/server-trust.p12";
public static final String P12_CLIENT_KEY = "src/test/resources/client-key.p12";
public static final String P12_CLIENT_TRUST = "src/test/resources/client-trust.p12";
public static final String KEYSTORE_TYPE_JKS = "jks";
public static final String KEYSTORE_TYPE_PKCS12 = "pkcs12";
public static final String TRUSTSTORE_TYPE_JKS = "jks";
public static final String TRUSTSTORE_TYPE_PKCS12 = "pkcs12";
protected GremlinServer server;
private Settings overriddenSettings;
private final static String epollOption = "gremlin.server.epoll";
private static final boolean GREMLIN_SERVER_EPOLL = "true".equalsIgnoreCase(System.getProperty(epollOption));
private static final Logger logger = LoggerFactory.getLogger(AbstractGremlinServerIntegrationTest.class);
@Rule
public TestName name = new TestName();
public Settings overrideSettings(final Settings settings) {
return settings;
}
/**
* This method may be called after {@link #startServer()} to (re-)set the evaluation timeout in
* the running server.
* @param timeoutInMillis new evaluation timeout
*/
protected void overrideEvaluationTimeout(final long timeoutInMillis) {
// Note: overriding settings in a running server is not guaranteed to work for all settings.
// It works for the evaluation timeout, though, because GremlinExecutor is re-created for each evaluation.
overriddenSettings.evaluationTimeout = timeoutInMillis;
}
public InputStream getSettingsInputStream() {
return AbstractGremlinServerIntegrationTest.class.getResourceAsStream("gremlin-server-integration.yaml");
}
@Before
public void setUp() throws Exception {
logger.info("Starting: " + name.getMethodName());
startServer();
}
public void setUp(final Settings settings) throws Exception {
logger.info("Starting: " + name.getMethodName());
startServer(settings);
}
public void startServer(final Settings settings) throws Exception {
if (null == settings) {
startServer();
} else {
final Settings oSettings = overrideSettings(settings);
if (shouldTestUnified()) {
oSettings.channelizer = UnifiedChannelizer.class.getName();
}
ServerTestHelper.rewritePathsInGremlinServerSettings(oSettings);
if (GREMLIN_SERVER_EPOLL) {
oSettings.useEpollEventLoop = true;
}
this.server = new GremlinServer(oSettings);
server.start().join();
}
}
public void startServer() throws Exception {
startServerAsync().join();
}
public CompletableFuture<ServerGremlinExecutor> startServerAsync() throws Exception {
final InputStream stream = getSettingsInputStream();
final Settings settings = Settings.read(stream);
overriddenSettings = overrideSettings(settings);
ServerTestHelper.rewritePathsInGremlinServerSettings(overriddenSettings);
if (GREMLIN_SERVER_EPOLL) {
overriddenSettings.useEpollEventLoop = true;
}
if (shouldTestUnified()) {
overriddenSettings.channelizer = UnifiedChannelizer.class.getName();
}
this.server = new GremlinServer(overriddenSettings);
return server.start();
}
@After
public void tearDown() throws Exception {
stopServer();
logger.info("Ending: " + name.getMethodName());
}
public void stopServer() throws Exception {
// calling close() on TinkerGraph does not free resources quickly enough. adding a clear() call let's gc
// cleanup earlier
server.getServerGremlinExecutor().getGraphManager().getAsBindings().values().stream()
.filter(g -> g instanceof TinkerGraph).forEach(g -> ((TinkerGraph) g).clear());
if (server != null) {
server.stop().join();
}
// reset the OpLoader processors so that they can get reconfigured on startup - Settings may have changed
// between tests
OpLoader.reset();
}
protected boolean isUsingUnifiedChannelizer() {
return server.getServerGremlinExecutor().
getSettings().channelizer.equals(UnifiedChannelizer.class.getName());
}
public static boolean deleteDirectory(final File directory) {
if (directory.exists()) {
final File[] files = directory.listFiles();
if (null != files) {
for (int i = 0; i < files.length; i++) {
if (files[i].isDirectory()) {
deleteDirectory(files[i]);
} else {
files[i].delete();
}
}
}
}
return (directory.delete());
}
protected static void tryIncludeNeo4jGraph(final Settings settings) {
if (isNeo4jPresent()) {
deleteDirectory(new File("/tmp/neo4j"));
settings.graphs.put("graph", "conf/neo4j-empty.properties");
}
}
protected static boolean isNeo4jPresent() {
try {
Class.forName("org.neo4j.tinkerpop.api.impl.Neo4jGraphAPIImpl");
return true;
} catch (Throwable ex) {
return false;
}
}
protected static void assumeNeo4jIsPresent() {
boolean neo4jIncludedForTesting = isNeo4jPresent();
assumeThat("Neo4j implementation was not included for testing - run with -DincludeNeo4j", neo4jIncludedForTesting, is(true));
}
private boolean shouldTestUnified() {
// ignore all tests in the UnifiedChannelizerIntegrateTest package as they are already rigged to test
// over the various channelizer implementations
return Boolean.parseBoolean(System.getProperty("testUnified", "false")) &&
!this.getClass().getPackage().equals(UnifiedChannelizerIntegrateTest.class.getPackage());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.driver;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import javax.net.ssl.SSLException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* A {@code Client} is constructed from a {@link Cluster} and represents a way to send messages to Gremlin Server.
* This class itself is a base class as there are different implementations that provide differing kinds of
* functionality. See the implementations for specifics on their individual usage.
* <p/>
* The {@code Client} is designed to be re-used and shared across threads.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public abstract class Client {
private static final Logger logger = LoggerFactory.getLogger(Client.class);
protected final Cluster cluster;
protected volatile boolean initialized;
protected final Client.Settings settings;
Client(final Cluster cluster, final Client.Settings settings) {
this.cluster = cluster;
this.settings = settings;
}
/**
* Makes any initial changes to the builder and returns the constructed {@link RequestMessage}. Implementers
* may choose to override this message to append data to the request before sending. By default, this method
* will simply return the {@code builder} passed in by the caller.
*/
public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) {
return builder;
}
/**
* Called in the {@link #init} method.
*/
protected abstract void initializeImplementation();
/**
* Chooses a {@link Connection} to write the message to.
*/
protected abstract Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException;
/**
* Asynchronous close of the {@code Client}.
*/
public abstract CompletableFuture<Void> closeAsync();
/**
* Create a new {@code Client} that aliases the specified {@link Graph} or {@link TraversalSource} name on the
* server to a variable called "g" for the context of the requests made through that {@code Client}.
*
* @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g"
*/
public Client alias(final String graphOrTraversalSource) {
return alias(makeDefaultAliasMap(graphOrTraversalSource));
}
/**
* Creates a {@code Client} that supplies the specified set of aliases, thus allowing the user to re-name
* one or more globally defined {@link Graph} or {@link TraversalSource} server bindings for the context of
* the created {@code Client}.
*/
public Client alias(final Map<String, String> aliases) {
return new AliasClusteredClient(this, aliases, settings);
}
/**
* Submit a {@link Traversal} to the server for remote execution.Results are returned as {@link Traverser}
* instances and are therefore bulked, meaning that to properly iterate the contents of the result each
* {@link Traverser#bulk()} must be examined to determine the number of times that object should be presented in
* iteration.
*/
public ResultSet submit(final Traversal traversal) {
try {
return submitAsync(traversal).get();
} catch (RuntimeException re) {
throw re;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* An asynchronous version of {@link #submit(Traversal)}. Results are returned as {@link Traverser} instances and
* are therefore bulked, meaning that to properly iterate the contents of the result each {@link Traverser#bulk()}
* must be examined to determine the number of times that object should be presented in iteration.
*/
public CompletableFuture<ResultSet> submitAsync(final Traversal traversal) {
throw new UnsupportedOperationException("This implementation does not support Traversal submission - use a sessionless Client created with from the alias() method");
}
/**
* Submit a {@link Bytecode} to the server for remote execution. Results are returned as {@link Traverser}
* instances and are therefore bulked, meaning that to properly iterate the contents of the result each
* {@link Traverser#bulk()} must be examined to determine the number of times that object should be presented in
* iteration.
*/
public ResultSet submit(final Bytecode bytecode) {
try {
return submitAsync(bytecode).get();
} catch (RuntimeException re) {
throw re;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* A version of {@link #submit(Bytecode)} which provides the ability to set per-request options.
*
* @param bytecode request in the form of gremlin {@link Bytecode}
* @param options for the request
* @see #submit(Bytecode)
*/
public ResultSet submit(final Bytecode bytecode, final RequestOptions options) {
try {
return submitAsync(bytecode, options).get();
} catch (RuntimeException re) {
throw re;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* An asynchronous version of {@link #submit(Traversal)}. Results are returned as {@link Traverser} instances and
* are therefore bulked, meaning that to properly iterate the contents of the result each {@link Traverser#bulk()}
* must be examined to determine the number of times that object should be presented in iteration.
*/
public CompletableFuture<ResultSet> submitAsync(final Bytecode bytecode) {
throw new UnsupportedOperationException("This implementation does not support Traversal submission - use a sessionless Client created with from the alias() method");
}
/**
* A version of {@link #submit(Bytecode)} which provides the ability to set per-request options.
*
* @param bytecode request in the form of gremlin {@link Bytecode}
* @param options for the request
* @see #submitAsync(Bytecode)
*/
public CompletableFuture<ResultSet> submitAsync(final Bytecode bytecode, final RequestOptions options) {
throw new UnsupportedOperationException("This implementation does not support Traversal submission - use a sessionless Client created with from the alias() method");
}
/**
* Initializes the client which typically means that a connection is established to the server. Depending on the
* implementation and configuration this blocking call may take some time. This method will be called
* automatically if it is not called directly and multiple calls will not have effect.
*/
public synchronized Client init() {
if (initialized)
return this;
logger.debug("Initializing client on cluster [{}]", cluster);
cluster.init();
initializeImplementation();
initialized = true;
return this;
}
/**
* Submits a Gremlin script to the server and returns a {@link ResultSet} once the write of the request is
* complete.
*
* @param gremlin the gremlin script to execute
*/
public ResultSet submit(final String gremlin) {
return submit(gremlin, RequestOptions.EMPTY);
}
/**
* Submits a Gremlin script and bound parameters to the server and returns a {@link ResultSet} once the write of
* the request is complete. If a script is to be executed repeatedly with slightly different arguments, prefer
* this method to concatenating a Gremlin script from dynamically produced strings and sending it to
* {@link #submit(String)}. Parameterized scripts will perform better.
*
* @param gremlin the gremlin script to execute
* @param parameters a map of parameters that will be bound to the script on execution
*/
public ResultSet submit(final String gremlin, final Map<String, Object> parameters) {
try {
return submitAsync(gremlin, parameters).get();
} catch (RuntimeException re) {
throw re;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* Submits a Gremlin script to the server and returns a {@link ResultSet} once the write of the request is
* complete.
*
* @param gremlin the gremlin script to execute
* @param options for the request
*/
public ResultSet submit(final String gremlin, final RequestOptions options) {
try {
return submitAsync(gremlin, options).get();
} catch (RuntimeException re) {
throw re;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* The asynchronous version of {@link #submit(String)} where the returned future will complete when the
* write of the request completes.
*
* @param gremlin the gremlin script to execute
*/
public CompletableFuture<ResultSet> submitAsync(final String gremlin) {
return submitAsync(gremlin, RequestOptions.build().create());
}
/**
* The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
* write of the request completes.
*
* @param gremlin the gremlin script to execute
* @param parameters a map of parameters that will be bound to the script on execution
*/
public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, Object> parameters) {
final RequestOptions.Builder options = RequestOptions.build();
if (parameters != null && !parameters.isEmpty()) {
parameters.forEach(options::addParameter);
}
return submitAsync(gremlin, options.create());
}
/**
* The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
* write of the request completes.
*
* @param gremlin the gremlin script to execute
* @param parameters a map of parameters that will be bound to the script on execution
* @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g"
* @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}.
*/
@Deprecated
public CompletableFuture<ResultSet> submitAsync(final String gremlin, final String graphOrTraversalSource,
final Map<String, Object> parameters) {
Map<String, String> aliases = null;
if (graphOrTraversalSource != null && !graphOrTraversalSource.isEmpty()) {
aliases = makeDefaultAliasMap(graphOrTraversalSource);
}
return submitAsync(gremlin, aliases, parameters);
}
/**
* The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the
* write of the request completes.
*
* @param gremlin the gremlin script to execute
* @param parameters a map of parameters that will be bound to the script on execution
* @param aliases aliases the specified global Gremlin Server variable some other name that then be used in the
* script where the key is the alias name and the value represents the global variable on the
* server
* @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}.
*/
@Deprecated
public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, String> aliases,
final Map<String, Object> parameters) {
final RequestOptions.Builder options = RequestOptions.build();
if (aliases != null && !aliases.isEmpty()) {
aliases.forEach(options::addAlias);
}
if (parameters != null && !parameters.isEmpty()) {
parameters.forEach(options::addParameter);
}
options.batchSize(cluster.connectionPoolSettings().resultIterationBatchSize);
return submitAsync(gremlin, options.create());
}
/**
* The asynchronous version of {@link #submit(String, RequestOptions)}} where the returned future will complete when the
* write of the request completes.
*
* @param gremlin the gremlin script to execute
* @param options the options to supply for this request
*/
public CompletableFuture<ResultSet> submitAsync(final String gremlin, final RequestOptions options) {
final int batchSize = options.getBatchSize().orElse(cluster.connectionPoolSettings().resultIterationBatchSize);
// need to call buildMessage() right away to get client specific configurations, that way request specific
// ones can override as needed
final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_EVAL))
.add(Tokens.ARGS_GREMLIN, gremlin)
.add(Tokens.ARGS_BATCH_SIZE, batchSize);
// apply settings if they were made available
options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
options.getParameters().ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, params));
options.getAliases().ifPresent(aliases -> request.addArg(Tokens.ARGS_ALIASES, aliases));
options.getOverrideRequestId().ifPresent(request::overrideRequestId);
options.getUserAgent().ifPresent(userAgent -> request.addArg(Tokens.ARGS_USER_AGENT, userAgent));
options.getLanguage().ifPresent(lang -> request.addArg(Tokens.ARGS_LANGUAGE, lang));
return submitAsync(request.create());
}
/**
* A low-level method that allows the submission of a manually constructed {@link RequestMessage}.
*/
public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
if (isClosing()) throw new IllegalStateException("Client is closed");
if (!initialized)
init();
final CompletableFuture<ResultSet> future = new CompletableFuture<>();
Connection connection = null;
try {
// the connection is returned to the pool once the response has been completed...see Connection.write()
// the connection may be returned to the pool with the host being marked as "unavailable"
connection = chooseConnection(msg);
connection.write(msg, future);
return future;
} catch (RuntimeException re) {
throw re;
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
if (logger.isDebugEnabled())
logger.debug("Submitted {} to - {}", msg, null == connection ? "connection not initialized" : connection.toString());
}
}
public abstract boolean isClosing();
/**
* Closes the client by making a synchronous call to {@link #closeAsync()}.
*/
public void close() {
closeAsync().join();
}
/**
* Gets the {@link Client.Settings}.
*/
public Settings getSettings() {
return settings;
}
/**
* Gets the {@link Cluster} that spawned this {@code Client}.
*/
public Cluster getCluster() {
return cluster;
}
protected Map<String, String> makeDefaultAliasMap(final String graphOrTraversalSource) {
final Map<String, String> aliases = new HashMap<>();
aliases.put("g", graphOrTraversalSource);
return aliases;
}
/**
* A {@code Client} implementation that does not operate in a session. Requests are sent to multiple servers
* given a {@link LoadBalancingStrategy}. Transactions are automatically committed
* (or rolled-back on error) after each request.
*/
public final static class ClusteredClient extends Client {
protected ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>();
private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null);
private Throwable initializationFailure = null;
ClusteredClient(final Cluster cluster, final Client.Settings settings) {
super(cluster, settings);
}
@Override
public boolean isClosing() {
return closing.get() != null;
}
/**
* Submits a Gremlin script to the server and returns a {@link ResultSet} once the write of the request is
* complete.
*
* @param gremlin the gremlin script to execute
*/
public ResultSet submit(final String gremlin, final String graphOrTraversalSource) {
return submit(gremlin, graphOrTraversalSource, null);
}
/**
* Submits a Gremlin script and bound parameters to the server and returns a {@link ResultSet} once the write of
* the request is complete. If a script is to be executed repeatedly with slightly different arguments, prefer
* this method to concatenating a Gremlin script from dynamically produced strings and sending it to
* {@link #submit(String)}. Parameterized scripts will perform better.
*
* @param gremlin the gremlin script to execute
* @param parameters a map of parameters that will be bound to the script on execution
* @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g"
*/
public ResultSet submit(final String gremlin, final String graphOrTraversalSource, final Map<String, Object> parameters) {
try {
return submitAsync(gremlin, graphOrTraversalSource, parameters).get();
} catch (RuntimeException re) {
throw re;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public Client alias(final String graphOrTraversalSource) {
final Map<String, String> aliases = new HashMap<>();
aliases.put("g", graphOrTraversalSource);
return alias(aliases);
}
/**
* {@inheritDoc}
*/
@Override
public Client alias(final Map<String, String> aliases) {
return new AliasClusteredClient(this, aliases, settings);
}
/**
* Uses a {@link LoadBalancingStrategy} to choose the best {@link Host} and then selects the best connection
* from that host's connection pool.
*/
@Override
protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
final Iterator<Host> possibleHosts = getAvailablePossibleHosts(msg);
// you can get no possible hosts in more than a few situations. perhaps the servers are just all down.
// or perhaps the client is not configured properly (disables ssl when ssl is enabled on the server). or
// the hosts can get marked unavailable during Cluster initialization or during usage if a Connection
// can not be borrowed from the ConnectionPool. if it cannot be borrowed, the driver has long thrown
// a generic NoHostAvailableException which implies that to truly determine the source of the failure
// you would need to consult the logs or refer back to the initial exception that triggered the problem.
// if the failure to find hosts occurred during initialization then we would at least expect to see the
// initializationFailure as the cause of the NoHostAvailableException. if it came as a result of
// ConnectionPool.borrowConnection() then we would only get the generic without a cause. Both are
// problematic because the initializationFailure technically only captures a single failure, thus multiple
// hosts aren't accounted for and the generic failure without a cause doesn't yield any information
// about the source of the failure. moreover, the submit() requests fail fast without any pause to see
// if the connection comes back which can fill logs with NoHostAvailableException errors making it harder
// to see the source of the problem.
//
// as of 3.5.5, keeping in mind that the Cluster is trying to reconnect in the background, the approach
// is to try to wait for a host to come back online up to the maxWaitForConnection by choosing a host at
// random to wait for. in this way the fast throw of NoHostAvailableException goes away in favor of a
// pause to wait for a host to come back while the driver reconnects in the background. we also get a
// slightly better exception in that it will be some form of timeout with a cause attached
final Host bestHost = possibleHosts.hasNext() ? possibleHosts.next() : getAnyPossibleHost();
final ConnectionPool pool = hostConnectionPools.get(bestHost);
return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
}
private Host getAnyPossibleHost() {
final List<Host> hosts = new ArrayList<>(cluster.allHosts());
Collections.shuffle(hosts);
return hosts.get(0);
}
private Iterator<Host> getAvailablePossibleHosts(final RequestMessage msg) {
final Iterator<Host> possibleHosts;
if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) {
final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST);
msg.getArgs().remove(Tokens.ARGS_HOST);
possibleHosts = IteratorUtils.of(host);
} else {
possibleHosts = this.cluster.loadBalancingStrategy().select(msg);
}
return possibleHosts;
}
/**
* Initializes the connection pools on all hosts.
*/
@Override
protected void initializeImplementation() {
// use a special executor here to initialize the Host instances as the worker thread pool may be
// insufficiently sized for this task and the parallel initialization of the ConnectionPool. if too small
// tasks may be schedule in such a way as to produce a deadlock: TINKERPOP-2550
//
// the cost of this single threaded executor here should be fairly small because it is only used once at
// initialization and shutdown. since users will typically construct a Client once for the life of their
// application there shouldn't be tons of thread pools being created and destroyed.
final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("gremlin-driver-initializer").build();
final ExecutorService hostExecutor = Executors.newSingleThreadExecutor(threadFactory);
try {
CompletableFuture.allOf(cluster.allHosts().stream()
.map(host -> CompletableFuture.runAsync(() -> initializeConnectionSetupForHost.accept(host), hostExecutor))
.toArray(CompletableFuture[]::new))
.join();
} catch (CompletionException ex) {
logger.error("Initialization failed", ex);
this.initializationFailure = ex;
} finally {
hostExecutor.shutdown();
}
// throw an error if there is no host available after initializing connection pool.
if (cluster.availableHosts().isEmpty())
throwNoHostAvailableException();
// try to re-initiate any unavailable hosts in the background.
final List<Host> unavailableHosts = cluster.allHosts()
.stream().filter(host -> !host.isAvailable()).collect(Collectors.toList());
if (!unavailableHosts.isEmpty()) {
CompletableFuture.runAsync(() -> handleUnavailableHosts(unavailableHosts));
}
}
private void throwNoHostAvailableException() {
final Throwable rootCause = ExceptionUtils.getRootCause(initializationFailure);
// allow the certain exceptions to propagate as a cause
if (rootCause instanceof SSLException || rootCause instanceof ConnectException) {
throw new NoHostAvailableException(initializationFailure);
} else {
throw new NoHostAvailableException();
}
}
/**
* Closes all the connection pools on all hosts.
*/
@Override
public synchronized CompletableFuture<Void> closeAsync() {
if (closing.get() != null)
return closing.get();
final CompletableFuture<Void> allPoolsClosedFuture =
CompletableFuture.allOf(hostConnectionPools.values().stream()
.map(ConnectionPool::closeAsync)
.toArray(CompletableFuture[]::new));
closing.set(allPoolsClosedFuture);
return closing.get();
}
private Consumer<Host> initializeConnectionSetupForHost = host -> {
try {
// hosts that don't initialize connection pools will come up as a dead host.
hostConnectionPools.put(host, new ConnectionPool(host, ClusteredClient.this));
// hosts are not marked as available at cluster initialization, and are made available here instead.
host.makeAvailable();
// added a new host to the cluster so let the load-balancer know.
ClusteredClient.this.cluster.loadBalancingStrategy().onNew(host);
} catch (RuntimeException ex) {
final String errMsg = "Could not initialize client for " + host;
logger.error(errMsg);
throw ex;
}
};
private void handleUnavailableHosts(List<Host> unavailableHosts) {
// start the re-initialization attempt for each of the unavailable hosts through Host.makeUnavailable().
try {
CompletableFuture.allOf(unavailableHosts.stream()
.map(host -> CompletableFuture.runAsync(() -> host.makeUnavailable(this::tryReInitializeHost)))
.toArray(CompletableFuture[]::new))
.join();
} catch (CompletionException ex) {
logger.error("", (ex.getCause() == null) ? ex : ex.getCause());
}
}
/**
* Attempt to re-initialize the {@link Host} that was previously marked as unavailable. This method gets called
* as part of a schedule in {@link Host} to periodically try to re-initialize.
*/
public boolean tryReInitializeHost(final Host host) {
logger.debug("Trying to re-initiate host connection pool on {}", host);
try {
initializeConnectionSetupForHost.accept(host);
return true;
} catch (Exception ex) {
logger.debug("Failed re-initialization attempt on {}", host, ex);
return false;
}
}
}
/**
* Uses a {@link Client.ClusteredClient} that rebinds requests to a specified {@link Graph} or
* {@link TraversalSource} instances on the server-side.
*/
public static class AliasClusteredClient extends Client {
private final Client client;
private final Map<String, String> aliases = new HashMap<>();
final CompletableFuture<Void> close = new CompletableFuture<>();
AliasClusteredClient(final Client client, final Map<String, String> aliases, final Client.Settings settings) {
super(client.cluster, settings);
this.client = client;
this.aliases.putAll(aliases);
}
@Override
public CompletableFuture<ResultSet> submitAsync(final Bytecode bytecode) {
return submitAsync(bytecode, RequestOptions.EMPTY);
}
@Override
public CompletableFuture<ResultSet> submitAsync(final Bytecode bytecode, final RequestOptions options) {
try {
// need to call buildMessage() right away to get client specific configurations, that way request specific
// ones can override as needed
final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_BYTECODE)
.processor("traversal")
.addArg(Tokens.ARGS_GREMLIN, bytecode));
// apply settings if they were made available
options.getBatchSize().ifPresent(batchSize -> request.add(Tokens.ARGS_BATCH_SIZE, batchSize));
options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
options.getOverrideRequestId().ifPresent(request::overrideRequestId);
options.getUserAgent().ifPresent(userAgent -> request.add(Tokens.ARGS_USER_AGENT, userAgent));
return submitAsync(request.create());
} catch (RuntimeException re) {
throw re;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
final RequestMessage.Builder builder = RequestMessage.from(msg);
// only add aliases which aren't already present. if they are present then they represent request level
// overrides which should be mucked with
if (!aliases.isEmpty()) {
final Map original = (Map) msg.getArgs().getOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap());
aliases.forEach((k, v) -> {
if (!original.containsKey(k))
builder.addArg(Tokens.ARGS_ALIASES, aliases);
});
}
return super.submitAsync(builder.create());
}
@Override
public CompletableFuture<ResultSet> submitAsync(final Traversal traversal) {
return submitAsync(traversal.asAdmin().getBytecode());
}
@Override
public synchronized Client init() {
if (close.isDone()) throw new IllegalStateException("Client is closed");
// the underlying client may not have been init'd
client.init();
return this;
}
@Override
public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) {
if (close.isDone()) throw new IllegalStateException("Client is closed");
if (!aliases.isEmpty())
builder.addArg(Tokens.ARGS_ALIASES, aliases);
return client.buildMessage(builder);
}
@Override
protected void initializeImplementation() {
// no init required
if (close.isDone()) {
throw new IllegalStateException("Client is closed");
} else if (cluster.availableHosts().isEmpty()) {
throw new NoHostAvailableException();
}
}
/**
* Delegates to the underlying {@link Client.ClusteredClient}.
*/
@Override
protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
if (close.isDone()) throw new IllegalStateException("Client is closed");
return client.chooseConnection(msg);
}
@Override
public void close() {
client.close();
}
@Override
public synchronized CompletableFuture<Void> closeAsync() {
return client.closeAsync();
}
@Override
public boolean isClosing() {
return client.isClosing();
}
/**
* {@inheritDoc}
*/
@Override
public Client alias(final Map<String, String> aliases) {
if (close.isDone()) throw new IllegalStateException("Client is closed");
return new AliasClusteredClient(client, aliases, settings);
}
}
/**
* A {@code Client} implementation that operates in the context of a session. Requests are sent to a single
* server, where each request is bound to the same thread with the same set of bindings across requests.
* Transaction are not automatically committed. It is up the client to issue commit/rollback commands.
*/
public final static class SessionedClient extends Client {
private final String sessionId;
private final boolean manageTransactions;
private final boolean maintainStateAfterException;
private ConnectionPool connectionPool;
private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null);
SessionedClient(final Cluster cluster, final Client.Settings settings) {
super(cluster, settings);
this.sessionId = settings.getSession().get().sessionId;
this.manageTransactions = settings.getSession().get().manageTransactions;
this.maintainStateAfterException = settings.getSession().get().maintainStateAfterException;
}
/**
* Returns the session identifier bound to this {@code Client}.
*/
public String getSessionId() {
return sessionId;
}
/**
* Adds the {@link Tokens#ARGS_SESSION} value to every {@link RequestMessage}.
*/
@Override
public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) {
builder.processor("session");
builder.addArg(Tokens.ARGS_SESSION, sessionId);
builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions);
builder.addArg(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION, maintainStateAfterException);
return builder;
}
/**
* Since the session is bound to a single host, simply borrow a connection from that pool.
*/
@Override
protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
return connectionPool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
}
/**
* Randomly choose an available {@link Host} to bind the session too and initialize the {@link ConnectionPool}.
*/
@Override
protected void initializeImplementation() {
// chooses a host at random from all hosts
if (cluster.allHosts().isEmpty()) {
throw new IllegalStateException("No available host in the cluster");
}
final List<Host> hosts = new ArrayList<>(cluster.allHosts());
Collections.shuffle(hosts);
// if a host has been marked as available, use it instead
Optional<Host> host = hosts.stream().filter(Host::isAvailable).findFirst();
final Host selectedHost = host.orElse(hosts.get(0));
// only mark host as available if we can initialize the connection pool successfully
try {
connectionPool = new ConnectionPool(selectedHost, this, Optional.of(1), Optional.of(1));
selectedHost.makeAvailable();
} catch (RuntimeException ex) {
logger.error("Could not initialize client for {}", host, ex);
throw new NoHostAvailableException(ex);
}
}
@Override
public boolean isClosing() {
return closing.get() != null;
}
/**
* Close the bound {@link ConnectionPool}.
*/
@Override
public synchronized CompletableFuture<Void> closeAsync() {
if (closing.get() != null)
return closing.get();
// the connection pool may not have been initialized if requests weren't sent across it. in those cases
// we just need to return a pre-completed future
final CompletableFuture<Void> connectionPoolClose = null == connectionPool ?
CompletableFuture.completedFuture(null) : connectionPool.closeAsync();
closing.set(connectionPoolClose);
return connectionPoolClose;
}
}
/**
* Settings given to {@link Cluster#connect(Client.Settings)} that configures how a {@link Client} will behave.
*/
public static class Settings {
private final Optional<SessionSettings> session;
private Settings(final Builder builder) {
this.session = builder.session;
}
public static Builder build() {
return new Builder();
}
/**
* Determines if the {@link Client} is to be constructed with a session. If the value is present, then a
* session is expected.
*/
public Optional<SessionSettings> getSession() {
return session;
}
public static class Builder {
private Optional<SessionSettings> session = Optional.empty();
private Builder() {
}
/**
* Enables a session. By default this will create a random session name and configure transactions to be
* unmanaged. This method will override settings provided by calls to the other overloads of
* {@code useSession}.
*/
public Builder useSession(final boolean enabled) {
session = enabled ? Optional.of(SessionSettings.build().create()) : Optional.empty();
return this;
}
/**
* Enables a session. By default this will create a session with the provided name and configure
* transactions to be unmanaged. This method will override settings provided by calls to the other
* overloads of {@code useSession}.
*/
public Builder useSession(final String sessionId) {
session = sessionId != null && !sessionId.isEmpty() ?
Optional.of(SessionSettings.build().sessionId(sessionId).create()) : Optional.empty();
return this;
}
/**
* Enables a session. This method will override settings provided by calls to the other overloads of
* {@code useSession}.
*/
public Builder useSession(final SessionSettings settings) {
session = Optional.ofNullable(settings);
return this;
}
public Settings create() {
return new Settings(this);
}
}
}
/**
* Settings for a {@link Client} that involve a session.
*/
public static class SessionSettings {
private final boolean manageTransactions;
private final String sessionId;
private final boolean forceClosed;
private final boolean maintainStateAfterException;
private SessionSettings(final Builder builder) {
manageTransactions = builder.manageTransactions;
sessionId = builder.sessionId;
forceClosed = builder.forceClosed;
maintainStateAfterException = builder.maintainStateAfterException;
}
/**
* If enabled, transactions will be "managed" such that each request will represent a complete transaction.
*/
public boolean manageTransactions() {
return manageTransactions;
}
/**
* Provides the identifier of the session.
*/
public String getSessionId() {
return sessionId;
}
/**
* Determines if the session will be force closed. See {@link Builder#forceClosed(boolean)} for more details
* on what that means.
*/
public boolean isForceClosed() {
return forceClosed;
}
public boolean maintainStateAfterException() {
return maintainStateAfterException;
}
public static SessionSettings.Builder build() {
return new SessionSettings.Builder();
}
public static class Builder {
private boolean manageTransactions = false;
private String sessionId = UUID.randomUUID().toString();
private boolean forceClosed = false;
private boolean maintainStateAfterException = false;
private Builder() {
}
/**
* When {@code true} an exception within a session will not close the session and remove the state bound to
* that session. This setting is for the {@code UnifiedChannelizer} and when set to {@code true} will allow
* sessions to behave similar to how they did under the {@code OpProcessor} approach original to Gremlin
* Server. By default this value is {@code false}.
*/
public Builder maintainStateAfterException(final boolean maintainStateAfterException) {
this.maintainStateAfterException = maintainStateAfterException;
return this;
}
/**
* If enabled, transactions will be "managed" such that each request will represent a complete transaction.
* By default this value is {@code false}.
*/
public Builder manageTransactions(final boolean manage) {
manageTransactions = manage;
return this;
}
/**
* Provides the identifier of the session. This value cannot be null or empty. By default it is set to
* a random {@code UUID}.
*/
public Builder sessionId(final String sessionId) {
if (null == sessionId || sessionId.isEmpty())
throw new IllegalArgumentException("sessionId cannot be null or empty");
this.sessionId = sessionId;
return this;
}
/**
* Determines if the session should be force closed when the client is closed. Force closing will not
* attempt to close open transactions from existing running jobs and leave it to the underlying graph to
* decided how to proceed with those orphaned transactions. Setting this to {@code true} tends to lead to
* faster close operation which can be desirable if Gremlin Server has a long session timeout and a long
* script evaluation timeout as attempts to close long run jobs can occur more rapidly. By default, this
* value is {@code false}.
*/
public Builder forceClosed(final boolean forced) {
this.forceClosed = forced;
return this;
}
public SessionSettings create() {
return new SessionSettings(this);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.driver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
final class ConnectionPool {
private static final Logger logger = LoggerFactory.getLogger(ConnectionPool.class);
public static final int MIN_POOL_SIZE = 2;
public static final int MAX_POOL_SIZE = 8;
public static final int MIN_SIMULTANEOUS_USAGE_PER_CONNECTION = 8;
public static final int MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 16;
public final Host host;
private final Cluster cluster;
private final Client client;
private final List<Connection> connections;
private final AtomicInteger open;
private final Set<Connection> bin = new CopyOnWriteArraySet<>();
private final int minPoolSize;
private final int maxPoolSize;
private final int minSimultaneousUsagePerConnection;
private final int maxSimultaneousUsagePerConnection;
private final int minInProcess;
private final String poolLabel;
private final AtomicInteger scheduledForCreation = new AtomicInteger();
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
private volatile int waiter = 0;
private final Lock waitLock = new ReentrantLock(true);
private final Condition hasAvailableConnection = waitLock.newCondition();
private final ExecutorService reconnectPool = Executors.newFixedThreadPool(4);
public ConnectionPool(final Host host, final Client client) {
this(host, client, Optional.empty(), Optional.empty());
}
public ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize,
final Optional<Integer> overrideMaxPoolSize) {
this.host = host;
this.client = client;
this.cluster = client.cluster;
poolLabel = "Connection Pool {host=" + host + "}";
final Settings.ConnectionPoolSettings settings = settings();
this.minPoolSize = overrideMinPoolSize.orElse(settings.minSize);
this.maxPoolSize = overrideMaxPoolSize.orElse(settings.maxSize);
this.minSimultaneousUsagePerConnection = settings.minSimultaneousUsagePerConnection;
this.maxSimultaneousUsagePerConnection = settings.maxSimultaneousUsagePerConnection;
this.minInProcess = settings.minInProcessPerConnection;
this.connections = new CopyOnWriteArrayList<>();
this.open = new AtomicInteger();
try {
final List<CompletableFuture<Void>> connectionCreationFutures = new ArrayList<>();
for (int i = 0; i < minPoolSize; i++) {
connectionCreationFutures.add(CompletableFuture.runAsync(() -> {
try {
this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
this.open.incrementAndGet();
} catch (ConnectionException e) {
throw new CompletionException(e);
}
}, cluster.executor()));
}
CompletableFuture.allOf(connectionCreationFutures.toArray(new CompletableFuture[0])).join();
} catch (CancellationException ce) {
logger.warn("Initialization of connections cancelled for {}", getPoolInfo(), ce);
throw ce;
} catch (CompletionException ce) {
// Some connections might have been initialized. Close the connection pool gracefully to close them.
this.closeAsync();
final String errMsg = "Could not initialize " + minPoolSize + " (minPoolSize) connections in pool." +
" Successful connections=" + this.connections.size() +
". Closing the connection pool.";
Throwable cause;
Throwable result = ce;
if (null != (cause = result.getCause())) {
result = cause;
}
throw new CompletionException(errMsg, result);
}
logger.info("Opening connection pool on {} with core size of {}", host, minPoolSize);
}
public Settings.ConnectionPoolSettings settings() {
return cluster.connectionPoolSettings();
}
/**
*
*/
private Connection createFastConnection() {
logger.debug("Fast borrowing connection from pool on {} to test connectivity after being marked unavailable", host);
if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
// try the pool first to see if anything is there
final Connection leastUsedConn = selectLeastUsed();
if (null != leastUsedConn) return leastUsedConn;
// nothing in the pool so try to just connect as fast as possible to quickly evaluate if the host is present.
return new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection);
}
public Connection borrowConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
logger.debug("Borrowing connection from pool on {} - timeout in {} {}", host, timeout, unit);
if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
final Connection leastUsedConn = selectLeastUsed();
if (connections.isEmpty()) {
logger.debug("Tried to borrow connection but the pool was empty for {} - scheduling pool creation and waiting for connection", host);
for (int i = 0; i < minPoolSize; i++) {
// If many connections are borrowed at the same time there needs to be a check to make sure no
// additional ones get scheduled for creation
if (scheduledForCreation.get() < minPoolSize) {
scheduledForCreation.incrementAndGet();
newConnection();
}
}
return waitForConnection(timeout, unit);
}
if (null == leastUsedConn) {
if (isClosed())
throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
logger.debug("Pool was initialized but a connection could not be selected earlier - waiting for connection on {}", host);
return waitForConnection(timeout, unit);
}
// if the number borrowed on the least used connection exceeds the max allowed and the pool size is
// not at maximum then consider opening a connection
final int currentPoolSize = connections.size();
if (leastUsedConn.borrowed.get() >= maxSimultaneousUsagePerConnection && currentPoolSize < maxPoolSize) {
if (logger.isDebugEnabled())
logger.debug("Least used {} on {} exceeds maxSimultaneousUsagePerConnection but pool size {} < maxPoolSize - consider new connection",
leastUsedConn.getConnectionInfo(), host, currentPoolSize);
considerNewConnection();
}
while (true) {
final int borrowed = leastUsedConn.borrowed.get();
final int availableInProcess = leastUsedConn.availableInProcess();
if (borrowed >= maxSimultaneousUsagePerConnection && leastUsedConn.availableInProcess() == 0) {
logger.debug("Least used connection selected from pool for {} but borrowed [{}] >= availableInProcess [{}] - wait",
host, borrowed, availableInProcess);
return waitForConnection(timeout, unit);
}
if (leastUsedConn.borrowed.compareAndSet(borrowed, borrowed + 1)) {
if (logger.isDebugEnabled())
logger.debug("Return least used {} on {}", leastUsedConn.getConnectionInfo(), host);
return leastUsedConn;
}
}
}
public void returnConnection(final Connection connection) throws ConnectionException {
logger.debug("Attempting to return {} on {}", connection, host);
if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
final int borrowed = connection.borrowed.decrementAndGet();
if (connection.isDead()) {
logger.debug("Marking {} as dead", this.host);
this.replaceConnection(connection);
} else {
if (bin.contains(connection) && borrowed == 0) {
logger.debug("{} is already in the bin and it has no inflight requests so it is safe to close", connection);
if (bin.remove(connection))
connection.closeAsync();
return;
}
// destroy a connection that exceeds the minimum pool size - it does not have the right to live if it
// isn't busy. replace a connection that has a low available in process count which likely means that
// it's backing up with requests that might never have returned. consider the maxPoolSize in this condition
// because if it is equal to 1 (which it is for a session) then there is no need to replace the connection
// as it will be responsible for every single request. if neither of these scenarios are met then let the
// world know the connection is available.
final int poolSize = connections.size();
final int availableInProcess = connection.availableInProcess();
if (poolSize > minPoolSize && borrowed <= minSimultaneousUsagePerConnection) {
if (logger.isDebugEnabled())
logger.debug("On {} pool size of {} > minPoolSize {} and borrowed of {} <= minSimultaneousUsagePerConnection {} so destroy {}",
host, poolSize, minPoolSize, borrowed, minSimultaneousUsagePerConnection, connection.getConnectionInfo());
destroyConnection(connection);
} else if (availableInProcess < minInProcess && maxPoolSize > 1) {
if (logger.isDebugEnabled())
logger.debug("On {} availableInProcess {} < minInProcess {} so replace {}", host, availableInProcess, minInProcess, connection.getConnectionInfo());
replaceConnection(connection);
} else
announceAvailableConnection();
}
}
Client getClient() {
return client;
}
Cluster getCluster() {
return cluster;
}
public boolean isClosed() {
return this.closeFuture.get() != null;
}
/**
* Permanently kills the pool.
*/
public synchronized CompletableFuture<Void> closeAsync() {
if (closeFuture.get() != null) return closeFuture.get();
logger.info("Signalled closing of connection pool on {} with core size of {}", host, minPoolSize);
announceAllAvailableConnection();
final CompletableFuture<Void> future = killAvailableConnections();
closeFuture.set(future);
return future;
}
/**
* Required for testing
*/
int numConnectionsWaitingToCleanup() {
return bin.size();
}
private CompletableFuture<Void> killAvailableConnections() {
final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size());
for (Connection connection : connections) {
final CompletableFuture<Void> future = connection.closeAsync();
future.thenRun(open::decrementAndGet);
futures.add(future);
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
/**
* This method is not idempotent and should only be called once per connection.
*/
void replaceConnection(final Connection connection) {
logger.info("Replace {}", connection);
// Do not replace connection if the conn pool is closing/closed.
// Do not replace connection if it is already being replaced.
if (connection.isBeingReplaced.getAndSet(true) || isClosed()) {
return;
}
considerNewConnection();
definitelyDestroyConnection(connection);
}
private void considerNewConnection() {
logger.debug("Considering new connection on {} where pool size is {}", host, connections.size());
while (true) {
int inCreation = scheduledForCreation.get();
logger.debug("There are {} connections scheduled for creation on {}", inCreation, host);
// don't create more than one at a time
if (inCreation >= 1)
return;
if (scheduledForCreation.compareAndSet(inCreation, inCreation + 1))
break;
}
newConnection();
}
private void newConnection() {
cluster.executor().submit(() -> {
addConnectionIfUnderMaximum();
scheduledForCreation.decrementAndGet();
return null;
});
}
private boolean addConnectionIfUnderMaximum() {
while (true) {
int opened = open.get();
if (opened >= maxPoolSize)
return false;
if (open.compareAndSet(opened, opened + 1))
break;
}
if (isClosed()) {
open.decrementAndGet();
return false;
}
try {
connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
} catch (ConnectionException ce) {
logger.error("Connections were under max, but there was an error creating the connection.", ce);
open.decrementAndGet();
considerHostUnavailable();
return false;
}
announceAvailableConnection();
return true;
}
private boolean destroyConnection(final Connection connection) {
while (true) {
int opened = open.get();
if (opened <= minPoolSize)
return false;
if (open.compareAndSet(opened, opened - 1))
break;
}
definitelyDestroyConnection(connection);
return true;
}
public void definitelyDestroyConnection(final Connection connection) {
// only add to the bin for future removal if its not already there.
if (!bin.contains(connection) && !connection.isClosing()) {
bin.add(connection);
connections.remove(connection);
open.decrementAndGet();
}
// only close the connection for good once it is done being borrowed or when it is dead
if (connection.isDead() || connection.borrowed.get() == 0) {
if (bin.remove(connection)) {
connection.closeAsync();
// TODO: Log the following message on completion of the future returned by closeAsync.
logger.debug("{} destroyed", connection.getConnectionInfo());
}
}
}
private Connection waitForConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
long start = System.nanoTime();
long remaining = timeout;
long to = timeout;
do {
try {
awaitAvailableConnection(remaining, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
to = 0;
}
if (isClosed())
throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
final Connection leastUsed = selectLeastUsed();
if (leastUsed != null) {
while (true) {
final int inFlight = leastUsed.borrowed.get();
final int availableInProcess = leastUsed.availableInProcess();
if (inFlight >= availableInProcess) {
logger.debug("Least used {} on {} has requests borrowed [{}] >= availableInProcess [{}] - may timeout waiting for connection",
leastUsed, host, inFlight, availableInProcess);
break;
}
if (leastUsed.borrowed.compareAndSet(inFlight, inFlight + 1)) {
if (logger.isDebugEnabled())
logger.debug("Return least used {} on {} after waiting", leastUsed.getConnectionInfo(), host);
return leastUsed;
}
}
}
remaining = to - TimeUtil.timeSince(start, unit);
logger.debug("Continue to wait for connection on {} if {} > 0", host, remaining);
} while (remaining > 0);
logger.error("Timed-out ({} {}) waiting for connection on {} - possibly unavailable", timeout, unit, host);
// if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
// either way supply a function to reconnect
this.considerHostUnavailable();
throw new TimeoutException("Timed-out waiting for connection on " + host + " - possibly unavailable");
}
public void considerHostUnavailable() {
// called when a connection is "dead" due to a non-recoverable error.
host.makeUnavailable(this::tryReconnect);
// if the host is unavailable then we should release the connections
connections.forEach(this::definitelyDestroyConnection);
// let the load-balancer know that the host is acting poorly
this.cluster.loadBalancingStrategy().onUnavailable(host);
}
/**
* Attempt to reconnect to the {@link Host} that was previously marked as unavailable. This method gets called
* as part of a schedule in {@link Host} to periodically try to create working connections.
*/
private boolean tryReconnect(final Host h) {
logger.debug("Trying to re-establish connection on {}", h);
Connection connection = null;
try {
connection = borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
final RequestMessage ping = client.buildMessage(cluster.validationRequest()).create();
final CompletableFuture<ResultSet> f = new CompletableFuture<>();
connection.write(ping, f);
f.get().all().get();
// host is reconnected and a connection is now available
this.cluster.loadBalancingStrategy().onAvailable(h);
return true;
} catch (Exception ex) {
logger.warn("Failed reconnect attempt on {}", h, ex);
if (connection != null) definitelyDestroyConnection(connection);
return false;
}
}
private void announceAvailableConnection() {
logger.debug("Announce connection available on {}", host);
if (waiter == 0)
return;
waitLock.lock();
try {
hasAvailableConnection.signal();
} finally {
waitLock.unlock();
}
}
private Connection selectLeastUsed() {
int minInFlight = Integer.MAX_VALUE;
Connection leastBusy = null;
for (Connection connection : connections) {
final int inFlight = connection.borrowed.get();
if (!connection.isDead() && inFlight < minInFlight) {
minInFlight = inFlight;
leastBusy = connection;
}
}
return leastBusy;
}
private void awaitAvailableConnection(long timeout, TimeUnit unit) throws InterruptedException {
logger.debug("Wait {} {} for an available connection on {} with {}", timeout, unit, host, Thread.currentThread());
waitLock.lock();
waiter++;
try {
hasAvailableConnection.await(timeout, unit);
} finally {
waiter--;
waitLock.unlock();
}
}
private void announceAllAvailableConnection() {
if (waiter == 0)
return;
waitLock.lock();
try {
hasAvailableConnection.signalAll();
} finally {
waitLock.unlock();
}
}
/**
* Returns the set of Channel IDs maintained by the connection pool.
* Currently, only used for testing.
*/
Set<String> getConnectionIDs() {
return connections.stream().map(Connection::getChannelId).collect(Collectors.toSet());
}
public String getPoolInfo() {
final StringBuilder sb = new StringBuilder("ConnectionPool (");
sb.append(host);
sb.append(") - ");
connections.forEach(c -> {
sb.append(c);
sb.append(",");
});
return sb.toString().trim();
}
@Override
public String toString() {
return poolLabel;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.server;
import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
import org.apache.log4j.Level;
import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.RequestOptions;
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.Tokens;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
import org.apache.tinkerpop.gremlin.driver.ser.GraphBinaryMessageSerializerV1;
import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0;
import org.apache.tinkerpop.gremlin.driver.ser.JsonBuilderGryoSerializer;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.server.handler.OpExecutorHandler;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertex;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
import groovy.json.JsonBuilder;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.awt.Color;
import java.io.File;
import java.net.ConnectException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.AllOf.allOf;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.StringContains.containsString;
import static org.hamcrest.core.StringEndsWith.endsWith;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.Mockito.verify;
/**
* Integration tests for gremlin-driver configurations and settings.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegrationTest {
private static final Logger logger = LoggerFactory.getLogger(GremlinDriverIntegrateTest.class);
private Log4jRecordingAppender recordingAppender = null;
private Level previousLogLevel;
@Before
public void setupForEachTest() {
recordingAppender = new Log4jRecordingAppender();
final org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
if (name.getMethodName().equals("shouldKeepAliveForWebSockets") ||
name.getMethodName().equals("shouldKeepAliveForWebSocketsWithNoInFlightRequests")) {
final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(WebSocketClientHandler.class);
previousLogLevel = webSocketClientHandlerLogger.getLevel();
webSocketClientHandlerLogger.setLevel(Level.DEBUG);
} else if (name.getMethodName().equals("shouldEventuallySucceedAfterMuchFailure")) {
final org.apache.log4j.Logger opExecutorHandlerLogger = org.apache.log4j.Logger.getLogger(OpExecutorHandler.class);
previousLogLevel = opExecutorHandlerLogger.getLevel();
opExecutorHandlerLogger.setLevel(Level.ERROR);
}
rootLogger.addAppender(recordingAppender);
}
@After
public void teardownForEachTest() {
final org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
if (name.getMethodName().equals("shouldKeepAliveForWebSockets") ||
name.getMethodName().equals("shouldKeepAliveForWebSocketsWithNoInFlightRequests")) {
final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(WebSocketClientHandler.class);
webSocketClientHandlerLogger.setLevel(previousLogLevel);
} else if (name.getMethodName().equals("shouldEventuallySucceedAfterMuchFailure")) {
final org.apache.log4j.Logger opExecutorHandlerLogger = org.apache.log4j.Logger.getLogger(OpExecutorHandler.class);
opExecutorHandlerLogger.setLevel(previousLogLevel);
}
rootLogger.removeAppender(recordingAppender);
}
/**
* Configure specific Gremlin Server settings for specific tests.
*/
@Override
public Settings overrideSettings(final Settings settings) {
final String nameOfTest = name.getMethodName();
switch (nameOfTest) {
case "shouldAliasTraversalSourceVariables":
case "shouldAliasTraversalSourceVariablesInSession":
try {
final String p = Storage.toPath(TestHelper.generateTempFileFromResource(
GremlinDriverIntegrateTest.class,
"generate-shouldRebindTraversalSourceVariables.groovy", ""));
final Map<String,Object> m = new HashMap<>();
m.put("files", Collections.singletonList(p));
settings.scriptEngines.get("gremlin-groovy").plugins.put(ScriptFileGremlinPlugin.class.getName(), m);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
break;
case "shouldFailWithBadClientSideSerialization":
final List<String> custom = Arrays.asList(
JsonBuilder.class.getName() + ";" + JsonBuilderGryoSerializer.class.getName(),
java.awt.Color.class.getName());
settings.serializers.stream().filter(s -> s.config.containsKey("custom"))
.findFirst().get().config.put("custom", custom);
break;
case "shouldExecuteScriptInSessionOnTransactionalGraph":
case "shouldExecuteSessionlessScriptOnTransactionalGraph":
case "shouldExecuteScriptInSessionOnTransactionalWithManualTransactionsGraph":
case "shouldExecuteInSessionAndSessionlessWithoutOpeningTransaction":
case "shouldManageTransactionsInSession":
tryIncludeNeo4jGraph(settings);
break;
case "shouldRequireAliasedGraphVariablesInStrictTransactionMode":
settings.strictTransactionManagement = true;
break;
case "shouldAliasGraphVariablesInStrictTransactionMode":
settings.strictTransactionManagement = true;
tryIncludeNeo4jGraph(settings);
break;
case "shouldProcessSessionRequestsInOrderAfterTimeout":
settings.evaluationTimeout = 250;
settings.threadPoolWorker = 1;
break;
case "shouldProcessTraversalInterruption":
case "shouldProcessEvalInterruption":
settings.evaluationTimeout = 1500;
break;
}
return settings;
}
@Test
public void shouldInterceptRequests() throws Exception {
final int requestsToMake = 32;
final AtomicInteger websocketHandshakeRequests = new AtomicInteger(0);
final Cluster cluster = TestClientFactory.build().
minConnectionPoolSize(1).maxConnectionPoolSize(1).handshakeInterceptor(r -> {
websocketHandshakeRequests.incrementAndGet();
return r;
}).create();
try {
final Client client = cluster.connect();
for (int ix = 0; ix < requestsToMake; ix++) {
assertEquals(ix + 1, client.submit(ix + "+1").all().get().get(0).getInt());
}
} finally {
cluster.close();
}
assertEquals(1, websocketHandshakeRequests.get());
}
@Test
public void shouldReportErrorWhenRequestCantBeSerialized() throws Exception {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
try {
final Client client = cluster.connect().alias("g");
try {
final Map<String, Object> params = new HashMap<>();
params.put("r", Color.RED);
client.submit("r", params).all().get();
fail("Should have thrown exception over bad serialization");
} catch (Exception ex) {
final Throwable inner = ExceptionHelper.getRootCause(ex);
assertThat(inner, instanceOf(ResponseException.class));
assertEquals(ResponseStatusCode.REQUEST_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode());
assertTrue(ex.getMessage().contains("An error occurred during serialization of this request"));
}
// should not die completely just because we had a bad serialization error. that kind of stuff happens
// from time to time, especially in the console if you're just exploring.
assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
} finally {
cluster.close();
}
}
@Test
public void shouldProcessTraversalInterruption() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
client.submit("g.inject(1).sideEffect{Thread.sleep(5000)}").all().get();
fail("Should have timed out");
} catch (Exception ex) {
final ResponseException re = (ResponseException) ex.getCause();
assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, re.getResponseStatusCode());
} finally {
cluster.close();
}
}
@Test
public void shouldProcessEvalInterruption() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
client.submit("Thread.sleep(5000);'done'").all().get();
fail("Should have timed out");
} catch (Exception ex) {
final ResponseException re = (ResponseException) ex.getCause();
assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, re.getResponseStatusCode());
} finally {
cluster.close();
}
}
@Test
public void shouldKeepAliveForWebSockets() throws Exception {
// keep the connection pool size at 1 to remove the possibility of lots of connections trying to ping which will
// complicate the assertion logic
final Cluster cluster = TestClientFactory.build().
minConnectionPoolSize(1).
maxConnectionPoolSize(1).
keepAliveInterval(1002).create();
try {
final Client client = cluster.connect();
// fire up lots of requests so as to schedule/deschedule lots of ping jobs
for (int ix = 0; ix < 500; ix++) {
assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
}
// don't send any messages for a bit so that the driver pings in the background
Thread.sleep(3000);
// make sure no bonus messages sorta fire off once we get back to sending requests
for (int ix = 0; ix < 500; ix++) {
assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
}
// there really shouldn't be more than 3 of these sent. should definitely be at least one though
final long messages = recordingAppender.getMessages().stream().filter(m -> m.contains("Sending ping frame to the server")).count();
assertThat(messages, allOf(greaterThan(0L), lessThanOrEqualTo(3L)));
} finally {
cluster.close();
}
}
@Test
public void shouldKeepAliveForWebSocketsWithNoInFlightRequests() throws Exception {
// keep the connection pool size at 1 to remove the possibility of lots of connections trying to ping which will
// complicate the assertion logic
final Cluster cluster = TestClientFactory.build().
minConnectionPoolSize(1).
maxConnectionPoolSize(1).
keepAliveInterval(1002).create();
try {
final Client client = cluster.connect();
// forcefully initialize the client to mimic a scenario when client has some active connection with no
// in flight requests on them.
client.init();
// don't send any messages for a bit so that the driver pings in the background
Thread.sleep(3000);
// make sure no bonus messages sorta fire off once we get back to sending requests
for (int ix = 0; ix < 500; ix++) {
assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
}
// there really shouldn't be more than 3 of these sent. should definitely be at least one though
final long messages = recordingAppender.getMessages().stream().filter(m -> m.contains("Sending ping frame to the server")).count();
assertThat(messages, allOf(greaterThan(0L), lessThanOrEqualTo(3L)));
} finally {
cluster.close();
}
}
@Test
public void shouldEventuallySucceedAfterChannelLevelError() throws Exception {
final Cluster cluster = TestClientFactory.build()
.reconnectInterval(500)
.maxContentLength(64).create();
final Client client = cluster.connect();
try {
try {
client.submit("def x = '';(0..<1024).each{x = x + '$it'};x").all().get();
fail("Request should have failed because it exceeded the max content length allowed");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root.getMessage(), containsString("Max frame length of 64 has been exceeded."));
}
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
} finally {
cluster.close();
}
}
@Test
public void shouldEventuallySucceedAfterMuchFailure() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
// tested independently to 10000 iterations but for speed, bumped back to 1000
IntStream.range(0, 1000).forEach(i -> {
try {
client.submit("1 + 9 9").all().join().get(0).getInt();
fail("Should not have gone through due to syntax error");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
}
});
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
} finally {
cluster.close();
}
}
@Test
public void shouldEventuallySucceedOnSameServerWithScript() throws Exception {
stopServer();
final Cluster cluster = TestClientFactory.build().validationRequest("g.inject()").create();
final Client client = cluster.connect();
try {
client.submit("1+1").all().join().get(0).getInt();
fail("Should not have gone through because the server is not running");
} catch (Exception i) {
assertThat(i, instanceOf(NoHostAvailableException.class));
final Throwable root = ExceptionHelper.getRootCause(i);
assertThat(root, instanceOf(ConnectException.class));
}
startServer();
// default reconnect time is 1 second so wait some extra time to be sure it has time to try to bring it
// back to life. usually this passes on the first attempt, but docker is sometimes slow and we get failures
// waiting for Gremlin Server to pop back up
for (int ix = 3; ix < 13; ix++) {
TimeUnit.SECONDS.sleep(ix);
try {
final int result = client.submit("1+1").all().join().get(0).getInt();
assertEquals(2, result);
break;
} catch (Exception ignored) {
logger.warn("Attempt {} failed on shouldEventuallySucceedOnSameServerWithScript", ix);
}
}
cluster.close();
}
@Test
public void shouldEventuallySucceedWithRoundRobin() throws Exception {
final String noGremlinServer = "74.125.225.19";
final Cluster cluster = TestClientFactory.build().addContactPoint(noGremlinServer).create();
try {
final Client client = cluster.connect();
client.init();
// the first host is dead on init. request should succeed on localhost
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
} finally {
cluster.close();
}
}
@Test
public void shouldHandleResultsOfAllSizes() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
final String script = "g.V().drop().iterate();\n" +
"\n" +
"List ids = new ArrayList();\n" +
"\n" +
"int ii = 0;\n" +
"Vertex v = graph.addVertex();\n" +
"v.property(\"ii\", ii);\n" +
"v.property(\"sin\", Math.sin(ii));\n" +
"ids.add(v.id());\n" +
"\n" +
"Random rand = new Random();\n" +
"for (; ii < size; ii++) {\n" +
" v = graph.addVertex();\n" +
" v.property(\"ii\", ii);\n" +
" v.property(\"sin\", Math.sin(ii/5.0));\n" +
" Vertex u = graph.vertices(ids.get(rand.nextInt(ids.size()))).next();\n" +
" v.addEdge(\"linked\", u);\n" +
" ids.add(u.id());\n" +
" ids.add(v.id());\n" +
"}\n" +
"g.V()";
final List<Integer> sizes = Arrays.asList(1, 10, 20, 50, 75, 100, 250, 500, 750, 1000, 5000, 10000);
for (Integer size : sizes) {
final Map<String, Object> params = new HashMap<>();
params.put("size", size - 1);
final ResultSet results = client.submit(script, params);
assertEquals(size.intValue(), results.all().get().size());
}
} finally {
cluster.close();
}
}
@Test
public void shouldFailWithBadClientSideSerialization() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
final ResultSet results = client.submit("java.awt.Color.RED");
try {
results.all().join();
fail("Should have thrown exception over bad serialization");
} catch (Exception ex) {
final Throwable inner = ExceptionHelper.getRootCause(ex);
assertThat(inner, instanceOf(ResponseException.class));
assertEquals(ResponseStatusCode.SERVER_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode());
}
// should not die completely just because we had a bad serialization error. that kind of stuff happens
// from time to time, especially in the console if you're just exploring.
assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
} finally {
cluster.close();
}
}
@Test
public void shouldFailWithScriptExecutionException() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
try {
final ResultSet results = client.submit("1/0");
results.all().join();
fail("Should have thrown exception over division by zero");
} catch (Exception ex) {
final Throwable inner = ExceptionHelper.getRootCause(ex);
assertTrue(inner instanceof ResponseException);
assertThat(inner.getMessage(), endsWith("Division by zero"));
final ResponseException rex = (ResponseException) inner;
assertEquals("java.lang.ArithmeticException", rex.getRemoteExceptionHierarchy().get().get(0));
assertEquals(1, rex.getRemoteExceptionHierarchy().get().size());
assertThat(rex.getRemoteStackTrace().get(), containsString("Division by zero"));
}
// should not die completely just because we had a bad serialization error. that kind of stuff happens
// from time to time, especially in the console if you're just exploring.
assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
} finally {
cluster.close();
}
}
@Test
public void shouldProcessRequestsOutOfOrder() throws Exception {
final Cluster cluster = TestClientFactory.open();
try {
final Client client = cluster.connect();
final ResultSet rsFive = client.submit("Thread.sleep(5000);'five'");
final ResultSet rsZero = client.submit("'zero'");
final CompletableFuture<List<Result>> futureFive = rsFive.all();
final CompletableFuture<List<Result>> futureZero = rsZero.all();
final long start = System.nanoTime();
assertFalse(futureFive.isDone());
assertEquals("zero", futureZero.get().get(0).getString());
logger.info("Eval of 'zero' complete: " + TimeUtil.millisSince(start));
assertFalse(futureFive.isDone());
assertEquals("five", futureFive.get(10, TimeUnit.SECONDS).get(0).getString());
logger.info("Eval of 'five' complete: " + TimeUtil.millisSince(start));
} finally {
cluster.close();
}
}
/**
* This test validates that the session requests are processed in-order on the server. The order of results
* returned to the client might be different though since each result is handled by a different executor thread.
*/
@Test
public void shouldProcessSessionRequestsInOrder() throws Exception {
final Cluster cluster = TestClientFactory.open();
try {
final Client client = cluster.connect(name.getMethodName());
final ResultSet first = client.submit("Thread.sleep(5000);g.V().fold().coalesce(unfold(), __.addV('person'))");
final ResultSet second = client.submit("g.V().count()");
final CompletableFuture<List<Result>> futureFirst = first.all();
final CompletableFuture<List<Result>> futureSecond = second.all();
final CountDownLatch latch = new CountDownLatch(2);
final List<Object> results = new ArrayList<>();
final ExecutorService executor = Executors.newSingleThreadExecutor();
futureFirst.thenAcceptAsync(r -> {
results.add(r.get(0).getVertex().label());
latch.countDown();
}, executor);
futureSecond.thenAcceptAsync(r -> {
results.add(r.get(0).getLong());
latch.countDown();
}, executor);
// wait for both results
latch.await(30000, TimeUnit.MILLISECONDS);
assertThat("Should contain 2 results", results.size() == 2);
assertThat("The numeric result should be 1", results.contains(1L));
assertThat("The string result contain label person", results.contains("person"));
executor.shutdown();
} finally {
cluster.close();
}
}
@Test
public void shouldWaitForAllResultsToArrive() throws Exception {
final Cluster cluster = TestClientFactory.open();
try {
final Client client = cluster.connect();
final AtomicInteger checked = new AtomicInteger(0);
final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
while (!results.allItemsAvailable()) {
assertTrue(results.getAvailableItemCount() < 10);
checked.incrementAndGet();
Thread.sleep(100);
}
assertTrue(checked.get() > 0);
assertEquals(9, results.getAvailableItemCount());
} finally {
cluster.close();
}
}
@Test
public void shouldStream() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
final AtomicInteger counter = new AtomicInteger(0);
results.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
assertEquals(9, counter.get());
assertThat(results.allItemsAvailable(), is(true));
// cant stream it again
assertThat(results.stream().iterator().hasNext(), is(false));
} finally {
cluster.close();
}
}
@Test
public void shouldIterate() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
final Iterator<Result> itty = results.iterator();
final AtomicInteger counter = new AtomicInteger(0);
while (itty.hasNext()) {
counter.incrementAndGet();
assertEquals(counter.get(), itty.next().getInt());
}
assertEquals(9, counter.get());
assertThat(results.allItemsAvailable(), is(true));
// can't stream it again
assertThat(results.iterator().hasNext(), is(false));
} finally {
cluster.close();
}
}
@Test
public void shouldGetSomeThenSomeMore() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
final CompletableFuture<List<Result>> batch1 = results.some(5);
final CompletableFuture<List<Result>> batch2 = results.some(5);
final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
assertEquals(5, batch1.get().size());
assertEquals(1, batch1.get().get(0).getInt());
assertEquals(2, batch1.get().get(1).getInt());
assertEquals(3, batch1.get().get(2).getInt());
assertEquals(4, batch1.get().get(3).getInt());
assertEquals(5, batch1.get().get(4).getInt());
assertEquals(4, batch2.get().size());
assertEquals(6, batch2.get().get(0).getInt());
assertEquals(7, batch2.get().get(1).getInt());
assertEquals(8, batch2.get().get(2).getInt());
assertEquals(9, batch2.get().get(3).getInt());
assertEquals(0, batchNothingLeft.get().size());
} finally {
cluster.close();
}
}
@Test
public void shouldGetOneThenSomeThenSomeMore() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
final Result one = results.one();
final CompletableFuture<List<Result>> batch1 = results.some(4);
final CompletableFuture<List<Result>> batch2 = results.some(5);
final CompletableFuture<List<Result>> batchNothingLeft = results.some(5);
assertEquals(1, one.getInt());
assertEquals(4, batch1.get().size());
assertEquals(2, batch1.get().get(0).getInt());
assertEquals(3, batch1.get().get(1).getInt());
assertEquals(4, batch1.get().get(2).getInt());
assertEquals(5, batch1.get().get(3).getInt());
assertEquals(4, batch2.get().size());
assertEquals(6, batch2.get().get(0).getInt());
assertEquals(7, batch2.get().get(1).getInt());
assertEquals(8, batch2.get().get(2).getInt());
assertEquals(9, batch2.get().get(3).getInt());
assertEquals(0, batchNothingLeft.get().size());
} finally {
cluster.close();
}
}
@Test
public void shouldAvoidDeadlockOnCallToResultSetDotAll() throws Exception {
// This test arose from this issue: https://github.org/apache/tinkerpop/tinkerpop3/issues/515
//
// ResultSet.all returns a CompletableFuture that blocks on the worker pool until isExhausted returns false.
// isExhausted in turn needs a thread on the worker pool to even return. So its totally possible to consume all
// threads on the worker pool waiting for .all to finish such that you can't even get one to wait for
// isExhausted to run.
//
// Note that all() doesn't work as described above anymore. It waits for callback on readComplete rather
// than blocking on isExhausted.
final int workerPoolSizeForDriver = 2;
// the number of requests 4 times the size of the worker pool as this originally did produce the problem
// described above in the javadoc of the test (though an equivalent number also produced it), but this has
// been tested to much higher multiples and passes. note that the maxWaitForConnection setting is high so
// that the client doesn't timeout waiting for an available connection. obviously this can also be fixed
// by increasing the maxConnectionPoolSize.
final int requests = workerPoolSizeForDriver * 4;
final Cluster cluster = TestClientFactory.build()
.workerPoolSize(workerPoolSizeForDriver)
.maxWaitForConnection(300000)
.create();
try {
final Client client = cluster.connect();
final CountDownLatch latch = new CountDownLatch(requests);
final AtomicReference[] refs = new AtomicReference[requests];
IntStream.range(0, requests).forEach(ix -> {
refs[ix] = new AtomicReference();
client.submitAsync("Thread.sleep(5000);[1,2,3,4,5,6,7,8,9]").thenAccept(rs ->
rs.all().thenAccept(refs[ix]::set).thenRun(latch::countDown));
});
// countdown should have reached zero as results should have eventually been all returned and processed
assertTrue(latch.await(30, TimeUnit.SECONDS));
final List<Integer> expected = IntStream.range(1, 10).boxed().collect(Collectors.toList());
IntStream.range(0, requests).forEach(r ->
assertTrue(expected.containsAll(((List<Result>) refs[r].get()).stream().map(resultItem -> new Integer(resultItem.getInt())).collect(Collectors.toList()))));
} finally {
cluster.close();
}
}
@Test
public void shouldCloseWithServerDown() throws Exception {
final Cluster cluster = TestClientFactory.open();
try {
cluster.connect().init();
stopServer();
} finally {
cluster.close();
}
}
@Test
public void shouldMarkHostDeadSinceServerIsDown() throws Exception {
final Cluster cluster = TestClientFactory.open();
try {
assertEquals(0, cluster.availableHosts().size());
final Client client1 = cluster.connect().init();
assertEquals(1, cluster.availableHosts().size());
stopServer();
// We create a new client here which will fail to initialize but the original client still has
// host marked as connected. Since the second client failed during initialization, it has no way to
// test if a host is indeed unreachable because it doesn't have any established connections. It will not add
// the host to load balancer but it will also not remove it if it already exists there. Leave that
// responsibility to a client that added it. In this case, let the second client perform it's own mechanism
// to mark host as unavailable. The first client will discover that the host has failed either with next
// keepAlive message or the next request, whichever is earlier. In this case, we will simulate the second
// scenario by sending a new request on first client. The request would fail (since server is down) and
// client should mark the host unavailable.
cluster.connect().init();
try {
client1.submit("1+1").all().join();
fail("Expecting an exception because the server is shut down.");
} catch (Exception ex) {
// ignore the exception
}
assertEquals(0, cluster.availableHosts().size());
} finally {
cluster.close();
}
}
@Test
public void shouldFailWithBadServerSideSerialization() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
final ResultSet results = client.submit("TinkerGraph.open().variables()");
try {
results.all().join();
fail();
} catch (Exception ex) {
final Throwable inner = ExceptionHelper.getRootCause(ex);
assertTrue(inner instanceof ResponseException);
assertEquals(ResponseStatusCode.SERVER_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode());
}
// should not die completely just because we had a bad serialization error. that kind of stuff happens
// from time to time, especially in the console if you're just exploring.
assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
} finally {
cluster.close();
}
}
@Test
public void shouldSerializeToStringWhenRequestedGraphBinaryV1() throws Exception {
final Map<String, Object> m = new HashMap<>();
m.put("serializeResultToString", true);
final GraphBinaryMessageSerializerV1 serializer = new GraphBinaryMessageSerializerV1();
serializer.configure(m, null);
final Cluster cluster = TestClientFactory.build().serializer(serializer).create();
final Client client = cluster.connect();
final ResultSet resultSet = client.submit("TinkerFactory.createClassic()");
final List<Result> results = resultSet.all().join();
assertEquals(1, results.size());
assertEquals("tinkergraph[vertices:6 edges:6]", results.get(0).getString());
cluster.close();
}
@Test
public void shouldSerializeToStringWhenRequestedGryoV1() throws Exception {
final Map<String, Object> m = new HashMap<>();
m.put("serializeResultToString", true);
final GryoMessageSerializerV1d0 serializer = new GryoMessageSerializerV1d0();
serializer.configure(m, null);
final Cluster cluster = TestClientFactory.build().serializer(serializer).create();
final Client client = cluster.connect();
try {
final ResultSet resultSet = client.submit("TinkerFactory.createClassic()");
final List<Result> results = resultSet.all().join();
assertEquals(1, results.size());
assertEquals("tinkergraph[vertices:6 edges:6]", results.get(0).getString());
} finally {
cluster.close();
}
}
@Test
public void shouldSerializeToStringWhenRequestedGryoV3() throws Exception {
final Map<String, Object> m = new HashMap<>();
m.put("serializeResultToString", true);
final GryoMessageSerializerV3d0 serializer = new GryoMessageSerializerV3d0();
serializer.configure(m, null);
final Cluster cluster = TestClientFactory.build().serializer(serializer).create();
final Client client = cluster.connect();
try {
final ResultSet resultSet = client.submit("TinkerFactory.createClassic()");
final List<Result> results = resultSet.all().join();
assertEquals(1, results.size());
assertEquals("tinkergraph[vertices:6 edges:6]", results.get(0).getString());
} finally {
cluster.close();
}
}
@Test
public void shouldDeserializeWithCustomClassesV1() throws Exception {
final Map<String, Object> m = new HashMap<>();
m.put("custom", Collections.singletonList(String.format("%s;%s", JsonBuilder.class.getCanonicalName(), JsonBuilderGryoSerializer.class.getCanonicalName())));
final GryoMessageSerializerV1d0 serializer = new GryoMessageSerializerV1d0();
serializer.configure(m, null);
final Cluster cluster = TestClientFactory.build().serializer(serializer).create();
final Client client = cluster.connect();
try {
final List<Result> json = client.submit("b = new groovy.json.JsonBuilder();b.people{person {fname 'stephen'\nlname 'mallette'}};b").all().join();
assertEquals("{\"people\":{\"person\":{\"fname\":\"stephen\",\"lname\":\"mallette\"}}}", json.get(0).getString());
} finally {
cluster.close();
}
}
@Test
public void shouldDeserializeWithCustomClassesV3() throws Exception {
final Map<String, Object> m = new HashMap<>();
m.put("custom", Collections.singletonList(String.format("%s;%s", JsonBuilder.class.getCanonicalName(), JsonBuilderGryoSerializer.class.getCanonicalName())));
final GryoMessageSerializerV3d0 serializer = new GryoMessageSerializerV3d0();
serializer.configure(m, null);
final Cluster cluster = TestClientFactory.build().serializer(serializer).create();
final Client client = cluster.connect();
try {
final List<Result> json = client.submit("b = new groovy.json.JsonBuilder();b.people{person {fname 'stephen'\nlname 'mallette'}};b").all().join();
assertEquals("{\"people\":{\"person\":{\"fname\":\"stephen\",\"lname\":\"mallette\"}}}", json.get(0).getString());
} finally {
cluster.close();
}
}
@Test
public void shouldWorkWithGraphSONV1Serialization() throws Exception {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V1D0).create();
final Client client = cluster.connect();
try {
final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
assertEquals(1, r.size());
final Map<String, Object> m = r.get(0).get(Map.class);
assertEquals(4, m.size());
assertEquals(1, m.get("id"));
assertEquals("person", m.get("label"));
assertEquals("vertex", m.get("type"));
final Map<String, Object> properties = (Map<String, Object>) m.get("properties");
assertEquals(2, properties.size());
final List<Object> names = (List<Object>) properties.get("name");
assertEquals(1, names.size());
final Map<String, Object> nameProperties = (Map<String, Object>) names.get(0);
assertEquals(2, nameProperties.size());
assertEquals(0l, nameProperties.get("id"));
assertEquals("marko", nameProperties.get("value"));
final List<Object> ages = (List<Object>) properties.get("age");
assertEquals(1, ages.size());
final Map<String, Object> ageProperties = (Map<String, Object>) ages.get(0);
assertEquals(2, ageProperties.size());
assertEquals(1l, ageProperties.get("id"));
assertEquals(29, ageProperties.get("value"));
} finally {
cluster.close();
}
}
@Test
public void shouldWorkWithGraphSONV2Serialization() throws Exception {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V2D0).create();
final Client client = cluster.connect();
try {
final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
assertEquals(1, r.size());
final Vertex v = r.get(0).get(DetachedVertex.class);
assertEquals(1, v.id());
assertEquals("person", v.label());
assertEquals(2, IteratorUtils.count(v.properties()));
assertEquals("marko", v.value("name"));
assertEquals(29, Integer.parseInt(v.value("age").toString()));
} finally {
cluster.close();
}
}
@Test
public void shouldWorkWithGraphSONExtendedV2Serialization() throws Exception {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V2D0).create();
final Client client = cluster.connect();
try {
final List<Result> r = client.submit("java.time.Instant.EPOCH").all().join();
assertEquals(1, r.size());
final Instant then = r.get(0).get(Instant.class);
assertEquals(Instant.EPOCH, then);
} finally {
cluster.close();
}
}
@Test
public void shouldWorkWithGraphSONV3Serialization() throws Exception {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
final Client client = cluster.connect();
try {
final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
assertEquals(1, r.size());
final Vertex v = r.get(0).get(DetachedVertex.class);
assertEquals(1, v.id());
assertEquals("person", v.label());
assertEquals(2, IteratorUtils.count(v.properties()));
assertEquals("marko", v.value("name"));
assertEquals(29, Integer.parseInt(v.value("age").toString()));
} finally {
cluster.close();
}
}
@Test
public void shouldWorkWithGraphSONExtendedV3Serialization() throws Exception {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
final Client client = cluster.connect();
try {
final List<Result> r = client.submit("java.time.Instant.EPOCH").all().join();
assertEquals(1, r.size());
final Instant then = r.get(0).get(Instant.class);
assertEquals(Instant.EPOCH, then);
} finally {
cluster.close();
}
}
@Test
public void shouldWorkWithGraphBinaryV1Serialization() throws Exception {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHBINARY_V1D0).create();
final Client client = cluster.connect();
try {
final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join();
assertEquals(1, r.size());
final Vertex v = r.get(0).get(ReferenceVertex.class);
assertEquals(1, v.id());
assertEquals("person", v.label());
} finally {
cluster.close();
}
}
@Test
public void shouldFailClientSideWithTooLargeAResponse() {
final Cluster cluster = TestClientFactory.build().maxContentLength(1).create();
final Client client = cluster.connect();
try {
final String fatty = IntStream.range(0, 100).mapToObj(String::valueOf).collect(Collectors.joining());
client.submit("'" + fatty + "'").all().get();
fail("Should throw an exception.");
} catch (Exception re) {
final Throwable root = ExceptionHelper.getRootCause(re);
assertTrue(root.getMessage().equals("Max frame length of 1 has been exceeded."));
} finally {
cluster.close();
}
}
@Test
public void shouldReturnNiceMessageFromOpSelector() {
final Cluster cluster = TestClientFactory.build().create();
final Client client = cluster.connect();
try {
final Map m = new HashMap<>();
m.put(null, "a null key will force a throw of OpProcessorException in message validation");
client.submit("1+1", m).all().get();
fail("Should throw an exception.");
} catch (Exception re) {
final Throwable root = ExceptionHelper.getRootCause(re);
assertEquals("The [eval] message is using one or more invalid binding keys - they must be of type String and cannot be null", root.getMessage());
} finally {
cluster.close();
}
}
@Test
public void shouldExecuteScriptInSession() throws Exception {
final Cluster cluster = TestClientFactory.build().create();
final Client client = cluster.connect(name.getMethodName());
final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]");
assertEquals(9, results1.all().get().size());
final ResultSet results2 = client.submit("x[0]+1");
assertEquals(2, results2.all().get().get(0).getInt());
final ResultSet results3 = client.submit("x[1]+2");
assertEquals(4, results3.all().get().get(0).getInt());
cluster.close();
}
@Test
public void shouldNotThrowNoSuchElementException() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
// this should return "nothing" - there should be no exception
assertNull(client.submit("g.V().has('name','kadfjaldjfla')").one());
} finally {
cluster.close();
}
}
@Test
public void shouldEvalInGremlinLang() {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
final RequestOptions ro = RequestOptions.build().language("gremlin-lang").create();
assertEquals(111, client.submit("g.inject(111)", ro).one().getInt());
} finally {
cluster.close();
}
}
@Test
public void shouldCloseSession() throws Exception {
final Cluster cluster = TestClientFactory.build().create();
final Client client = cluster.connect(name.getMethodName());
final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]");
assertEquals(9, results1.all().get().size());
final ResultSet results2 = client.submit("x[0]+1");
assertEquals(2, results2.all().get().get(0).getInt());
client.close();
try {
client.submit("x[0]+1").all().get();
fail("Should have thrown an exception because the connection is closed");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(IllegalStateException.class));
} finally {
cluster.close();
}
}
@Test
public void shouldExecuteScriptInSessionAssumingDefaultedImports() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect(name.getMethodName());
final ResultSet results1 = client.submit("TinkerFactory.class.name");
assertEquals(TinkerFactory.class.getName(), results1.all().get().get(0).getString());
cluster.close();
}
@Test
public void shouldExecuteScriptInSessionOnTransactionalGraph() throws Exception {
assumeNeo4jIsPresent();
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect(name.getMethodName());
final Vertex vertexBeforeTx = client.submit("v=g.addV(\"person\").property(\"name\",\"stephen\").next()").all().get().get(0).getVertex();
assertEquals("person", vertexBeforeTx.label());
final String nameValueFromV = client.submit("g.V().values('name').next()").all().get().get(0).getString();
assertEquals("stephen", nameValueFromV);
final Vertex vertexFromBinding = client.submit("v").all().get().get(0).getVertex();
assertEquals("person", vertexFromBinding.label());
final Map<String,Object> vertexAfterTx = client.submit("g.V(v).property(\"color\",\"blue\").iterate(); g.tx().commit(); g.V(v).valueMap().by(unfold())").all().get().get(0).get(Map.class);
assertEquals("stephen", vertexAfterTx.get("name"));
assertEquals("blue", vertexAfterTx.get("color"));
cluster.close();
}
@Test
public void shouldExecuteScriptInSessionOnTransactionalWithManualTransactionsGraph() throws Exception {
assumeNeo4jIsPresent();
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect(name.getMethodName());
final Client sessionlessClient = cluster.connect();
client.submit("graph.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL);null").all().get();
client.submit("graph.tx().open()").all().get();
final Vertex vertexBeforeTx = client.submit("v=g.addV(\"person\").property(\"name\", \"stephen\").next()").all().get().get(0).getVertex();
assertEquals("person", vertexBeforeTx.label());
final String nameValueFromV = client.submit("g.V().values(\"name\").next()").all().get().get(0).getString();
assertEquals("stephen", nameValueFromV);
final Vertex vertexFromBinding = client.submit("v").all().get().get(0).getVertex();
assertEquals("person", vertexFromBinding.label());
client.submit("g.V(v).property(\"color\",\"blue\")").all().get();
client.submit("g.tx().commit()").all().get();
// Run a sessionless request to change transaction.readWriteConsumer back to AUTO
// The will make the next in session request fail if consumers aren't ThreadLocal
sessionlessClient.submit("g.V().next()").all().get();
client.submit("g.tx().open()").all().get();
final Map<String,Object> vertexAfterTx = client.submit("g.V().valueMap().by(unfold())").all().get().get(0).get(Map.class);
assertEquals("stephen", vertexAfterTx.get("name"));
assertEquals("blue", vertexAfterTx.get("color"));
client.submit("g.tx().rollback()").all().get();
cluster.close();
}
@Test
public void shouldExecuteInSessionAndSessionlessWithoutOpeningTransaction() throws Exception {
assumeNeo4jIsPresent();
final Cluster cluster = TestClientFactory.open();
try {
final Client sessionClient = cluster.connect(name.getMethodName());
final Client sessionlessClient = cluster.connect();
//open transaction in session, then add vertex and commit
sessionClient.submit("g.tx().open()").all().get();
final Vertex vertexBeforeTx = sessionClient.submit("v=g.addV(\"person\").property(\"name\",\"stephen\").next()").all().get().get(0).getVertex();
assertEquals("person", vertexBeforeTx.label());
sessionClient.submit("g.tx().commit()").all().get();
// check that session transaction is closed
final boolean isOpen = sessionClient.submit("g.tx().isOpen()").all().get().get(0).getBoolean();
assertTrue("Transaction should be closed", !isOpen);
//run a sessionless read
sessionlessClient.submit("g.V()").all().get();
// check that session transaction is still closed
final boolean isOpenAfterSessionless = sessionClient.submit("g.tx().isOpen()").all().get().get(0).getBoolean();
assertTrue("Transaction should stil be closed", !isOpenAfterSessionless);
} finally {
cluster.close();
}
}
@Test
public void shouldExecuteSessionlessScriptOnTransactionalGraph() throws Exception {
assumeNeo4jIsPresent();
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
// this line is important because it tests GraphTraversal which has a certain transactional path
final Vertex vertexRequest1 = client.submit("g.addV().property(\"name\",\"stephen\")").all().get().get(0).getVertex();
final Vertex vertexRequest2 = client.submit("graph.vertices().next()").all().get().get(0).getVertex();
assertEquals(vertexRequest1.id(), vertexRequest2.id());
// this line is important because it tests the other transactional path
client.submit("graph.addVertex(\"name\",\"marko\")").all().get().get(0).getVertex();
assertEquals(2, client.submit("g.V().count()").all().get().get(0).getLong());
cluster.close();
}
@Test
public void shouldExecuteScriptInSessionWithBindingsSavedOnServerBetweenRequests() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect(name.getMethodName());
final Map<String, Object> bindings1 = new HashMap<>();
bindings1.put("a", 100);
bindings1.put("b", 200);
final ResultSet results1 = client.submit("x = a + b", bindings1);
assertEquals(300, results1.one().getInt());
final Map<String, Object> bindings2 = new HashMap<>();
bindings2.put("b", 100);
final ResultSet results2 = client.submit("x + b + a", bindings2);
assertEquals(500, results2.one().getInt());
final Map<String, Object> bindings3 = new HashMap<>();
bindings3.put("x", 100);
final ResultSet results3 = client.submit("x + b + a + 1", bindings3);
assertEquals(301, results3.one().getInt());
final Map<String, Object> bindings4 = new HashMap<>();
final ResultSet results4 = client.submit("x + b + a + 1", bindings4);
assertEquals(301, results4.one().getInt());
cluster.close();
}
@Test
public void shouldExecuteScriptsInMultipleSession() throws Exception {
final Cluster cluster = TestClientFactory.open();
try {
final Client client1 = cluster.connect(name.getMethodName() + "1");
final Client client2 = cluster.connect(name.getMethodName() + "2");
final Client client3 = cluster.connect(name.getMethodName() + "3");
final ResultSet results11 = client1.submit("x = 1");
final ResultSet results21 = client2.submit("x = 2");
final ResultSet results31 = client3.submit("x = 3");
assertEquals(1, results11.all().get().get(0).getInt());
assertEquals(2, results21.all().get().get(0).getInt());
assertEquals(3, results31.all().get().get(0).getInt());
final ResultSet results12 = client1.submit("x + 100");
final ResultSet results22 = client2.submit("x * 2");
final ResultSet results32 = client3.submit("x * 10");
assertEquals(101, results12.all().get().get(0).getInt());
assertEquals(4, results22.all().get().get(0).getInt());
assertEquals(30, results32.all().get().get(0).getInt());
} finally {
cluster.close();
}
}
@Test
public void shouldNotHaveKnowledgeOfBindingsBetweenRequestsWhenSessionless() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client1 = cluster.connect();
final Client client2 = cluster.connect();
final Client client3 = cluster.connect();
final ResultSet results11 = client1.submit("x = 1");
final ResultSet results21 = client2.submit("x = 2");
final ResultSet results31 = client3.submit("x = 3");
assertEquals(1, results11.all().get().get(0).getInt());
assertEquals(2, results21.all().get().get(0).getInt());
assertEquals(3, results31.all().get().get(0).getInt());
try {
client1.submit("x").all().get();
fail("The variable 'x' should not be present on the new request.");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
assertThat(root.getMessage(), containsString("No such property: x for class"));
}
try {
client2.submit("x").all().get();
fail("The variable 'x' should not be present on the new request.");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
assertThat(root.getMessage(), containsString("No such property: x for class"));
}
try {
client3.submit("x").all().get();
fail("The variable 'x' should not be present on the new request.");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
assertThat(root.getMessage(), containsString("No such property: x for class"));
}
cluster.close();
}
@Test
public void shouldBeThreadSafeToUseOneClient() throws Exception {
final Cluster cluster = TestClientFactory.build().workerPoolSize(2)
.maxInProcessPerConnection(64)
.minInProcessPerConnection(32)
.maxConnectionPoolSize(16)
.minConnectionPoolSize(8).create();
final Client client = cluster.connect();
final Map<Integer, Integer> results = new ConcurrentHashMap<>();
final List<Thread> threads = new ArrayList<>();
for (int ix = 0; ix < 100; ix++) {
final int otherNum = ix;
final Thread t = new Thread(()->{
try {
results.put(otherNum, client.submit("1000+" + otherNum).all().get().get(0).getInt());
} catch (Exception ex) {
ex.printStackTrace();
}
}, name.getMethodName() + "-" + ix);
t.start();
threads.add(t);
}
threads.forEach(FunctionUtils.wrapConsumer(Thread::join));
for (int ix = 0; ix < results.size(); ix++) {
assertThat(results.containsKey(ix), is(true));
assertEquals(1000 + ix, results.get(ix).intValue());
}
cluster.close();
}
@Test
public void shouldRequireAliasedGraphVariablesInStrictTransactionMode() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
client.submit("1+1").all().get();
fail("Should have tossed an exception because strict mode is on and no aliasing was performed");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
final ResponseException re = (ResponseException) root;
assertEquals(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS, re.getResponseStatusCode());
} finally {
cluster.close();
}
}
@Test
public void shouldAliasGraphVariablesInStrictTransactionMode() throws Exception {
assumeNeo4jIsPresent();
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
client.submit("g.addVertex('name','stephen');").all().get().get(0).getVertex();
fail("Should have tossed an exception because \"g\" does not have the addVertex method under default config");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
final ResponseException re = (ResponseException) root;
assertEquals(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS, re.getResponseStatusCode());
final Client rebound = cluster.connect().alias("graph");
final Vertex v = rebound.submit("g.addVertex(T.label,'person')").all().get().get(0).getVertex();
assertEquals("person", v.label());
} finally {
cluster.close();
}
}
@Test
public void shouldAliasGraphVariables() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
try {
client.submit("g.addVertex(label,'person','name','stephen');").all().get().get(0).getVertex();
fail("Should have tossed an exception because \"g\" does not have the addVertex method under default config");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
final ResponseException re = (ResponseException) root;
assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION, re.getResponseStatusCode());
final Client rebound = cluster.connect().alias("graph");
final Vertex v = rebound.submit("g.addVertex(label,'person','name','jason')").all().get().get(0).getVertex();
assertEquals("person", v.label());
} finally {
cluster.close();
}
}
@Test
public void shouldAliasTraversalSourceVariables() throws Exception {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRYO_V3D0).create();
final Client client = cluster.connect();
try {
try {
client.submit("g.addV().property('name','stephen')").all().get().get(0).getVertex();
fail("Should have tossed an exception because \"g\" is readonly in this context");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
final ResponseException re = (ResponseException) root;
assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION, re.getResponseStatusCode());
}
final Client clientAliased = client.alias("g1");
final Vertex v = clientAliased.submit("g.addV().property('name','jason')").all().get().get(0).getVertex();
assertEquals("jason", v.value("name"));
} finally {
cluster.close();
}
}
@Test
public void shouldAliasGraphVariablesInSession() throws Exception {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRYO_V3D0).create();
final Client client = cluster.connect(name.getMethodName());
try {
client.submit("g.addVertex('name','stephen');").all().get().get(0).getVertex();
fail("Should have tossed an exception because \"g\" does not have the addVertex method under default config");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
final ResponseException re = (ResponseException) root;
assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION, re.getResponseStatusCode());
client.close();
}
try {
final Client aliased = cluster.connect(name.getMethodName()).alias("graph");
assertEquals("jason", aliased.submit("n='jason'").all().get().get(0).getString());
final Vertex v = aliased.submit("g.addVertex('name',n)").all().get().get(0).getVertex();
assertEquals("jason", v.value("name"));
} finally {
cluster.close();
}
}
@Test
public void shouldAliasTraversalSourceVariablesInSession() throws Exception {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRYO_V3D0).create();
final Client client = cluster.connect(name.getMethodName());
try {
client.submit("g.addV().property('name','stephen')").all().get().get(0).getVertex();
fail("Should have tossed an exception because \"g\" is readonly in this context");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
final ResponseException re = (ResponseException) root;
assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION, re.getResponseStatusCode());
}
final Client clientAliased = client.alias("g1");
assertEquals("jason", clientAliased.submit("n='jason'").all().get().get(0).getString());
final Vertex v = clientAliased.submit("g.addV().property('name',n)").all().get().get(0).getVertex();
assertEquals("jason", v.value("name"));
cluster.close();
}
@Test
public void shouldManageTransactionsInSession() throws Exception {
assumeNeo4jIsPresent();
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect();
final Client sessionWithManagedTx = cluster.connect(name.getMethodName() + "-managed", true);
final Client sessionWithoutManagedTx = cluster.connect(name.getMethodName() + "-not-managed");
// this should auto-commit
sessionWithManagedTx.submit("v = g.addV().property('name','stephen').next()").all().get().get(0).getVertex();
// the other clients should see that change because of auto-commit
assertThat(client.submit("g.V().has('name','stephen').hasNext()").all().get().get(0).getBoolean(), is(true));
assertThat(sessionWithoutManagedTx.submit("g.V().has('name','stephen').hasNext()").all().get().get(0).getBoolean(), is(true));
// this should NOT auto-commit
final Vertex vDaniel = sessionWithoutManagedTx.submit("v = g.addV().property('name','daniel').next()").all().get().get(0).getVertex();
// the other clients should NOT see that change because of auto-commit
assertThat(client.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(false));
assertThat(sessionWithManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(false));
// but "v" should still be there
final Vertex vDanielAgain = sessionWithoutManagedTx.submit("v").all().get().get(0).getVertex();
assertEquals(vDaniel.id(), vDanielAgain.id());
// now commit manually
sessionWithoutManagedTx.submit("g.tx().commit()").all().get();
// should be there for all now
assertThat(client.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(true));
assertThat(sessionWithManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(true));
assertThat(sessionWithoutManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(true));
cluster.close();
}
@Test
public void shouldProcessSessionRequestsInOrderAfterTimeout() throws Exception {
final Cluster cluster = TestClientFactory.open();
try {
// this configures the client to behave like OpProcessor for UnifiedChannelizer
final Client.SessionSettings settings = Client.SessionSettings.build().
sessionId(name.getMethodName()).maintainStateAfterException(true).create();
final Client client = cluster.connect(Client.Settings.build().useSession(settings).create());
for (int index = 0; index < 50; index++) {
final CompletableFuture<ResultSet> first = client.submitAsync(
"Object mon1 = 'mon1';\n" +
"synchronized (mon1) {\n" +
" mon1.wait();\n" +
"} ");
final CompletableFuture<ResultSet> second = client.submitAsync(
"Object mon2 = 'mon2';\n" +
"synchronized (mon2) {\n" +
" mon2.wait();\n" +
"}");
final CompletableFuture<ResultSet> third = client.submitAsync(
"Object mon3 = 'mon3';\n" +
"synchronized (mon3) {\n" +
" mon3.wait();\n" +
"}");
final CompletableFuture<ResultSet> fourth = client.submitAsync(
"Object mon4 = 'mon4';\n" +
"synchronized (mon4) {\n" +
" mon4.wait();\n" +
"}");
final CompletableFuture<List<Result>> futureFirst = first.get().all();
final CompletableFuture<List<Result>> futureSecond = second.get().all();
final CompletableFuture<List<Result>> futureThird = third.get().all();
final CompletableFuture<List<Result>> futureFourth = fourth.get().all();
assertFutureTimeout(futureFirst);
assertFutureTimeout(futureSecond);
assertFutureTimeout(futureThird);
assertFutureTimeout(futureFourth);
}
} finally {
cluster.close();
}
}
private void assertFutureTimeout(final CompletableFuture<List<Result>> f) {
try {
f.get();
fail("Should have timed out");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(ResponseException.class));
assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode());
assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms")));
}
}
@Test
public void shouldCloseAllClientsOnCloseOfCluster() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client sessionlessOne = cluster.connect();
final Client session = cluster.connect("session");
final Client sessionlessTwo = cluster.connect();
final Client sessionlessThree = cluster.connect();
final Client sessionlessFour = cluster.connect();
assertEquals(2, sessionlessOne.submit("1+1").all().get().get(0).getInt());
assertEquals(2, session.submit("1+1").all().get().get(0).getInt());
assertEquals(2, sessionlessTwo.submit("1+1").all().get().get(0).getInt());
assertEquals(2, sessionlessThree.submit("1+1").all().get().get(0).getInt());
// dont' send anything on the 4th client
// close one of these Clients before the Cluster
sessionlessThree.close();
cluster.close();
try {
sessionlessOne.submit("1+1").all().get();
fail("Should have tossed an exception because cluster was closed");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(IllegalStateException.class));
assertEquals("Client is closed", root.getMessage());
}
try {
session.submit("1+1").all().get();
fail("Should have tossed an exception because cluster was closed");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(IllegalStateException.class));
assertEquals("Client is closed", root.getMessage());
}
try {
sessionlessTwo.submit("1+1").all().get();
fail("Should have tossed an exception because cluster was closed");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(IllegalStateException.class));
assertEquals("Client is closed", root.getMessage());
}
try {
sessionlessThree.submit("1+1").all().get();
fail("Should have tossed an exception because cluster was closed");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(IllegalStateException.class));
assertEquals("Client is closed", root.getMessage());
}
try {
sessionlessFour.submit("1+1").all().get();
fail("Should have tossed an exception because cluster was closed");
} catch (Exception ex) {
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root, instanceOf(IllegalStateException.class));
assertEquals("Client is closed", root.getMessage());
}
// allow call to close() even though closed through cluster
sessionlessOne.close();
session.close();
sessionlessTwo.close();
cluster.close();
}
@Test
public void shouldSendUserAgent() throws Exception {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
final Client client = Mockito.spy(cluster.connect().alias("g"));
client.submit("", RequestOptions.build().userAgent("test").create()).all().get();
cluster.close();
final ArgumentCaptor<RequestMessage> requestMessageCaptor = ArgumentCaptor.forClass(RequestMessage.class);
verify(client).submitAsync(requestMessageCaptor.capture());
final RequestMessage requestMessage = requestMessageCaptor.getValue();
assertEquals("test", requestMessage.getArgs().get(Tokens.ARGS_USER_AGENT));
}
@Test
public void shouldSendUserAgentBytecode() {
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
final Client client = Mockito.spy(cluster.connect().alias("g"));
Mockito.when(client.alias("g")).thenReturn(client);
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(client));
g.with(Tokens.ARGS_USER_AGENT, "test").V().iterate();
cluster.close();
final ArgumentCaptor<RequestOptions> requestOptionsCaptor = ArgumentCaptor.forClass(RequestOptions.class);
verify(client).submitAsync(Mockito.any(Bytecode.class), requestOptionsCaptor.capture());
final RequestOptions requestOptions = requestOptionsCaptor.getValue();
assertEquals("test", requestOptions.getUserAgent().get());
final ArgumentCaptor<RequestMessage> requestMessageCaptor = ArgumentCaptor.forClass(RequestMessage.class);
verify(client).submitAsync(requestMessageCaptor.capture());
final RequestMessage requestMessage = requestMessageCaptor.getValue();
assertEquals("test", requestMessage.getArgs().getOrDefault(Tokens.ARGS_USER_AGENT, null));
}
@Test
public void shouldSendRequestIdBytecode() {
final UUID overrideRequestId = UUID.randomUUID();
final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
final Client client = Mockito.spy(cluster.connect().alias("g"));
Mockito.when(client.alias("g")).thenReturn(client);
final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(client));
g.with(Tokens.REQUEST_ID, overrideRequestId).V().iterate();
cluster.close();
final ArgumentCaptor<RequestOptions> requestOptionsCaptor = ArgumentCaptor.forClass(RequestOptions.class);
verify(client).submitAsync(Mockito.any(Bytecode.class), requestOptionsCaptor.capture());
final RequestOptions requestOptions = requestOptionsCaptor.getValue();
assertTrue(requestOptions.getOverrideRequestId().isPresent());
assertEquals(overrideRequestId, requestOptions.getOverrideRequestId().get());
final ArgumentCaptor<RequestMessage> requestMessageCaptor = ArgumentCaptor.forClass(RequestMessage.class);
verify(client).submitAsync(requestMessageCaptor.capture());
final RequestMessage requestMessage = requestMessageCaptor.getValue();
assertEquals(overrideRequestId, requestMessage.getRequestId());
}
@Test
public void shouldClusterReadFileFromResources() throws Exception {
final Cluster cluster = Cluster.open(TestClientFactory.RESOURCE_PATH);
assertNotNull(cluster);
cluster.close();
}
@Test
public void shouldNotHangWhenSameRequestIdIsUsed() throws Exception {
final Cluster cluster = TestClientFactory.build().maxConnectionPoolSize(1).minConnectionPoolSize(1).create();
final Client client = cluster.connect();
final UUID requestId = UUID.randomUUID();
final Future<ResultSet> result1 = client.submitAsync("Thread.sleep(2000);100",
RequestOptions.build().overrideRequestId(requestId).create());
// wait for some business to happen on the server
Thread.sleep(100);
try {
// re-use the id and fail
client.submit("1+1+97", RequestOptions.build().overrideRequestId(requestId).create());
fail("Request should not have been sent due to duplicate id");
} catch(Exception ex) {
// should get a rejection here
final Throwable root = ExceptionHelper.getRootCause(ex);
assertThat(root.getMessage(), startsWith("There is already a request pending with an id of:"));
assertEquals(100, result1.get().one().getInt());
} finally {
cluster.close();
}
}
/**
* Client created on an initially dead host should fail initially, and recover after the dead host has restarted
* @param testClusterClient - boolean flag set to test clustered client if true and sessioned client if false.
*/
private void testShouldFailOnInitiallyDeadHost(final boolean testClusterClient) throws Exception {
logger.info("Stopping server.");
this.stopServer();
final Cluster cluster = TestClientFactory.build().create();
final Client client = testClusterClient? cluster.connect() : cluster.connect("sessionClient");
try {
// try to re-issue a request now that the server is down
logger.info("Verifying driver cannot connect to server.");
client.submit("g").all().get(500, TimeUnit.MILLISECONDS);
fail("Should throw an exception.");
} catch (Exception re) {
// Client would have no active connections to the host, hence it would encounter a timeout
// trying to find an alive connection to the host.
assertThat(re, instanceOf(NoHostAvailableException.class));
try {
client.submit("1+1").all().get(3000, TimeUnit.MILLISECONDS);
fail("Should throw exception on the retry");
} catch (RuntimeException re2) {
if (client instanceof Client.SessionedClient) {
assertThat(re2.getCause().getCause(), instanceOf(ConnectionException.class));
} else {
assertThat(re2.getCause().getCause().getCause(), instanceOf(ConnectException.class));
}
}
//
// should recover when the server comes back
//
// restart server
logger.info("Restarting server.");
this.startServer();
// try a bunch of times to reconnect. on slower systems this may simply take longer...looking at you travis
for (int ix = 1; ix < 11; ix++) {
// the retry interval is 1 second, wait a bit longer
TimeUnit.MILLISECONDS.sleep(1250);
try {
logger.info(String.format("Connecting driver to server - attempt # %s. ", 1 + ix));
final List<Result> results = client.submit("1+1").all().get(3000, TimeUnit.MILLISECONDS);
assertEquals(1, results.size());
assertEquals(2, results.get(0).getInt());
logger.info("Connection successful.");
break;
} catch (Exception ex) {
if (ix == 10)
fail("Should have eventually succeeded");
}
}
} finally {
cluster.close();
}
}
@Test
public void shouldFailOnInitiallyDeadHostForClusterClient() throws Exception {
testShouldFailOnInitiallyDeadHost(true);
}
@Test
public void shouldFailOnInitiallyDeadHostForSessionClient() throws Exception {
testShouldFailOnInitiallyDeadHost(false);
}
@Test
@org.junit.Ignore
public void shouldEventuallySucceedAfterHostBecomesUnavailable() throws Exception {
final Cluster cluster = TestClientFactory.build().minConnectionPoolSize(4).maxConnectionPoolSize(4).
maxWaitForConnection(4000).validationRequest("g.inject()").create();
final Client client = cluster.connect();
client.init();
// get an initial connection which marks the host as available
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
// no more server - bye!
stopServer();
// send another request which will mark the host unavailable, where it timesout
try {
client.submit("1+1").all().join().get(0).getInt();
fail("Should not have gone through because the server is not running anymore");
} catch (Exception i) {
final Throwable t = ExceptionHelper.getRootCause(i);
assertThat(t, instanceOf(TimeoutException.class));
}
try {
client.submit("1+1").all().join().get(0).getInt();
fail("Should not have gone through because the server is not running anymore");
} catch (Exception i) {
final Throwable t = ExceptionHelper.getRootCause(i);
assertThat(t, instanceOf(TimeoutException.class));
}
// load up a hella ton of requests
final AtomicInteger nhaFailures = new AtomicInteger(0);
final int requests = 10000;
final CountDownLatch latch = new CountDownLatch(requests);
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (Exception ignored) {}
IntStream.range(0, requests).forEach(i -> {
try {
client.submitAsync("1 + " + i);
if (i % 100 == 0) {
Thread.sleep(1000);
}
} catch (Exception ex) {
if (ex instanceof NoHostAvailableException)
nhaFailures.incrementAndGet();
else
logger.warn("Expecting {} typically but got a {} with message: {}",
NoHostAvailableException.class.getSimpleName(),
ex.getClass().getName(), ex.getMessage());
} finally {
latch.countDown();
}
});
}).start();
startServerAsync();
// default reconnect time is 1 second so wait some extra time to be sure it has time to try to bring it
// back to life. usually this passes on the first attempt, but docker is sometimes slow and we get failures
// waiting for Gremlin Server to pop back up
boolean eventuallyWorked = false;
for (int ix = 3; ix < 33; ix++) {
TimeUnit.SECONDS.sleep(ix);
try {
final int result = client.submit("1+1").all().join().get(0).getInt();
assertEquals(2, result);
eventuallyWorked = true;
break;
} catch (Exception ignored) {
logger.warn("Attempt {} failed on shouldEventuallySucceedOnSameServerWithScript", ix);
}
}
assertThat(eventuallyWorked, is(true));
latch.await(90000, TimeUnit.MILLISECONDS);
logger.warn("Ended with {}", nhaFailures.get());
cluster.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment