Skip to content

Instantly share code, notes, and snippets.

@eastlondoner
Created February 3, 2021 13:20
Show Gist options
  • Save eastlondoner/cc836d094247688b44bb2cf741215883 to your computer and use it in GitHub Desktop.
Save eastlondoner/cc836d094247688b44bb2cf741215883 to your computer and use it in GitHub Desktop.
Running independent async queries using neo4j java driver
/**
* Creates a disposable async session and uses it to perform the provided unit of work.
* Ensures that the provided work is committed successfully and the async session has been closed before the returned completion stage completes.
* @param driver Driver to use
* @param sessionConfig Session Config to use when creating the async session (can be used to pass in a bookmark)
* @param isWrite Is this a write transaction (will be sent as a read transaction if false)
* @param work Unit of transaction work
* @param <U> Type returned by the transaction work
* @return A completion stage that is completed successfully when the session has been closed - either with an error or with the result of the UoW
*/
private static <U> CompletionStage<U> runAsyncTransaction( Driver driver, SessionConfig sessionConfig, boolean isWrite,
AsyncTransactionWork<CompletionStage<U>> work )
{
final var asyncSession = driver.asyncSession( sessionConfig );
final Function<AsyncTransactionWork<CompletionStage<U>>,CompletionStage<U>> sessionTransactionFunction =
isWrite ? asyncSession::writeTransactionAsync : asyncSession::readTransactionAsync;
return sessionTransactionFunction
.apply(
tx -> work.execute( tx )
.thenCompose(
r -> tx.commitAsync().thenApply( ignored -> r )
)
)
.handle( closeSessionAndRethrowExceptions( asyncSession ) )
.thenCompose( i -> i );
}
private static <U> BiFunction<U,Throwable,CompletionStage<U>> closeSessionAndRethrowExceptions( AsyncSession session )
{
return ( originalResult, originalException ) ->
session.closeAsync()
.handle( ( ignored, sessionCloseException ) ->
{
if ( sessionCloseException != null && originalException != null )
{
sessionCloseException.addSuppressed( originalException );
throw new RuntimeException( sessionCloseException );
}
else if ( sessionCloseException != null )
{
throw new RuntimeException( sessionCloseException );
}
else if ( originalException != null )
{
throw new RuntimeException( originalException );
}
return originalResult;
}
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment