Created
February 3, 2021 13:20
-
-
Save eastlondoner/cc836d094247688b44bb2cf741215883 to your computer and use it in GitHub Desktop.
Running independent async queries using neo4j java driver
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Creates a disposable async session and uses it to perform the provided unit of work. | |
| * Ensures that the provided work is committed successfully and the async session has been closed before the returned completion stage completes. | |
| * @param driver Driver to use | |
| * @param sessionConfig Session Config to use when creating the async session (can be used to pass in a bookmark) | |
| * @param isWrite Is this a write transaction (will be sent as a read transaction if false) | |
| * @param work Unit of transaction work | |
| * @param <U> Type returned by the transaction work | |
| * @return A completion stage that is completed successfully when the session has been closed - either with an error or with the result of the UoW | |
| */ | |
| private static <U> CompletionStage<U> runAsyncTransaction( Driver driver, SessionConfig sessionConfig, boolean isWrite, | |
| AsyncTransactionWork<CompletionStage<U>> work ) | |
| { | |
| final var asyncSession = driver.asyncSession( sessionConfig ); | |
| final Function<AsyncTransactionWork<CompletionStage<U>>,CompletionStage<U>> sessionTransactionFunction = | |
| isWrite ? asyncSession::writeTransactionAsync : asyncSession::readTransactionAsync; | |
| return sessionTransactionFunction | |
| .apply( | |
| tx -> work.execute( tx ) | |
| .thenCompose( | |
| r -> tx.commitAsync().thenApply( ignored -> r ) | |
| ) | |
| ) | |
| .handle( closeSessionAndRethrowExceptions( asyncSession ) ) | |
| .thenCompose( i -> i ); | |
| } | |
| private static <U> BiFunction<U,Throwable,CompletionStage<U>> closeSessionAndRethrowExceptions( AsyncSession session ) | |
| { | |
| return ( originalResult, originalException ) -> | |
| session.closeAsync() | |
| .handle( ( ignored, sessionCloseException ) -> | |
| { | |
| if ( sessionCloseException != null && originalException != null ) | |
| { | |
| sessionCloseException.addSuppressed( originalException ); | |
| throw new RuntimeException( sessionCloseException ); | |
| } | |
| else if ( sessionCloseException != null ) | |
| { | |
| throw new RuntimeException( sessionCloseException ); | |
| } | |
| else if ( originalException != null ) | |
| { | |
| throw new RuntimeException( originalException ); | |
| } | |
| return originalResult; | |
| } | |
| ); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment