Last active
December 15, 2015 20:09
-
-
Save ben-manes/5316351 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* A test for the connection life-cycle management within jOOQ managed operations. | |
* | |
* @author Ben Manes ([email protected]) | |
*/ | |
@Guice(modules = {JooqTestModule.class, TestModule.class}) | |
public class ConnectionLifecycleTest { | |
final AtomicInteger counter = new AtomicInteger(); | |
@Inject CountingExecuteListener countingExecuteListener; | |
@Inject CountingDataSource countingDataSource; | |
@Inject TransactionOperations template; | |
@Inject Executor db; | |
@BeforeMethod | |
public void beforeMethod() { | |
countingDataSource.reset(); | |
countingExecuteListener.reset(); | |
} | |
@AfterMethod | |
public void afterMethod() { | |
db.truncate(USER).execute(); | |
} | |
@Test | |
public void perExecutorStatement() { | |
insertUser("John", "Doe"); | |
insertUser("Jane", "Doe"); | |
assertThat(countingDataSource.getNumberOfEstablishedConnections(), is(2)); | |
assertThat(countingExecuteListener.getStartCount(), is(4)); | |
assertThat(countingExecuteListener.getEndCount(), is(4)); | |
assertThat(isInTransaction(), is(false)); | |
} | |
@Test(enabled = false) | |
public void perExecutorStatement_lazy() { | |
// FIXME(ben): An implicitly lazy operation does not indicate to the execute listeners that | |
// the execution has completed. This results in connection leak. | |
db.selectCount().from(USER).fetchOne(); | |
assertThat(countingDataSource.getNumberOfEstablishedConnections(), is(1)); | |
assertThat(countingExecuteListener.getStartCount(), is(1)); | |
assertThat(countingExecuteListener.getEndCount(), is(1)); | |
assertThat(isInTransaction(), is(false)); | |
} | |
@Test | |
public void perExecutorStatement_rollback() throws Throwable { | |
try { | |
UserRecord record = newUserRecord("John", "Doe"); | |
db.batchInsert(ImmutableList.of(record, record)).execute(); | |
Assert.fail(); | |
} catch (DataAccessException e) { | |
assertThat(countingDataSource.getNumberOfEstablishedConnections(), is(1)); | |
assertThat(countingExecuteListener.getStartCount(), is(3)); | |
assertThat(countingExecuteListener.getEndCount(), is(3)); | |
assertThat(isInTransaction(), is(false)); | |
assertThat(count(), is(0)); | |
assertThat(isInTransaction(), is(false)); | |
} | |
} | |
@Test(threadPoolSize = 2, invocationCount = 50) | |
public void concurrent() { | |
insertUser("John", "Doe-" + counter.incrementAndGet()); | |
} | |
@Test(threadPoolSize = 2, invocationCount = 50) | |
public void concurrent_rollback() throws Throwable { | |
try { | |
insertUser("John", null); | |
Assert.fail(); | |
} catch (DataAccessException e) { | |
rethrowIfNotConstraintViolation(e); | |
} | |
} | |
@Test | |
public void transactional() { | |
template.execute(new TransactionCallbackWithoutResult() { | |
@Override protected void doInTransactionWithoutResult(TransactionStatus status) { | |
insertUser("John", "Doe"); | |
insertUser("Jane", "Doe"); | |
assertThat(isInTransaction(), is(true)); | |
} | |
}); | |
assertThat(countingDataSource.getNumberOfEstablishedConnections(), is(1)); | |
assertThat(countingExecuteListener.getStartCount(), is(4)); | |
assertThat(countingExecuteListener.getEndCount(), is(4)); | |
assertThat(isInTransaction(), is(false)); | |
} | |
@Test | |
public void transactional_rollback() { | |
try { | |
template.execute(new TransactionCallbackWithoutResult() { | |
@Override protected void doInTransactionWithoutResult(TransactionStatus status) { | |
insertUser("John", "Doe"); | |
insertUser("John", "Doe"); | |
assertThat(isInTransaction(), is(true)); | |
} | |
}); | |
Assert.fail(); | |
} catch (DataAccessException e) { | |
assertThat(count(), is(0)); | |
} | |
} | |
/** | |
* Inserts a new user and fetches back the record in a single executor statement. This translates | |
* into multiple SQL operations where two executor life cycles are triggered, with the insert | |
* wrapping over the fetch's execution life cycle. | |
*/ | |
private void insertUser(String firstName, String lastName) { | |
db.insertInto(USER).set(newUserRecord(firstName, lastName)).returning().fetchOne(); | |
} | |
private UserRecord newUserRecord(String firstName, String lastName) { | |
UserRecord record = new UserRecord(); | |
record.setFirstName(firstName); | |
record.setLastName(lastName); | |
return record; | |
} | |
private int count() { | |
return db.selectCount().from(USER).fetch().get(0).value1(); | |
} | |
private boolean isInTransaction() { | |
return TransactionSynchronizationManager.isSynchronizationActive(); | |
} | |
private void rethrowIfNotConstraintViolation(DataAccessException e) throws Throwable { | |
Throwable cause = e.getCause(); | |
boolean expected = (cause instanceof SQLException) && cause.getMessage().contains("NULL"); | |
if (!expected) { | |
throw cause; | |
} | |
} | |
public static final class TestModule extends AbstractModule { | |
@Override protected void configure() { | |
bind(DelegatingDataSource.class).to(CountingDataSource.class); | |
Multibinder.newSetBinder(binder(), ExecuteListener.class) | |
.addBinding().to(CountingExecuteListener.class); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* A connection provider for a transactionally aware {@link DataSource}. | |
* | |
* @author Ben Manes ([email protected]) | |
*/ | |
final class TransactionAwareConnectionProvider implements ConnectionProvider { | |
private final DataSource dataSource; | |
@Inject | |
public TransactionAwareConnectionProvider(DataSource dataSource) { | |
this.dataSource = dataSource; | |
} | |
@Override | |
public Connection acquire() { | |
try { | |
return dataSource.getConnection(); | |
} catch (SQLException e) { | |
throw new DataAccessException("Error getting connection from data source " + dataSource, e); | |
} | |
} | |
@Override | |
public void release(Connection released) { | |
checkArgument(DataSourceUtils.isConnectionTransactional(released, dataSource), | |
"Expected the connection to be managed by the current thread's transaction"); | |
try { | |
DataSourceUtils.doReleaseConnection(released, dataSource); | |
} catch (SQLException e) { | |
throw new DataAccessException("Error closing connection " + released, e); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* An execution listener that helps manages the connection's life-cycle. | |
* <p> | |
* jOOQ does not manage a connection's life-cycle (commit, rollback, etc). The default connection | |
* behavior require stateful executors so that the application logic can manage the connection. If | |
* the application is not careful then multi-threaded usage may cause a connection leak. As an | |
* operation may fetch a database connection multiple times during its execution, the same | |
* underlying connection must be used. | |
* <p> | |
* If a transaction is active then Spring will manage the connection in a thread local and return | |
* the same instance throughout multiple operations. This allows multiple statements and nested | |
* transactions to be committed or rolled back as an atomic unit. This behavior is leveraged to | |
* to ensure that jOOQ's {@link Executor} is stateless by ensuring that all executions are | |
* transactional, even in the absence of an application-defined transaction. | |
* | |
* @author Ben Manes ([email protected]) | |
*/ | |
final class TransactionAwareExecutionListener extends DefaultExecuteListener { | |
private static final Logger logger = Logger.getLogger(TransactionAwareExecutionListener.class); | |
private static final ThreadLocal<TransactionStatus> transactionStatus = | |
new ThreadLocal<TransactionStatus>(); | |
private static final ThreadLocal<Integer> nestedExecutions = new ThreadLocal<Integer>() { | |
@Override public Integer initialValue() { | |
return 0; | |
} | |
}; | |
private static final long serialVersionUID = 1L; | |
private final PlatformTransactionManager transactionManager; | |
private final TransactionDefinition definition; | |
@Inject | |
public TransactionAwareExecutionListener(PlatformTransactionManager transactionManager) { | |
this.definition = new DefaultTransactionDefinition(); | |
this.transactionManager = transactionManager; | |
} | |
@Override | |
public void start(ExecuteContext context) { | |
incrementNestedExecutions(); | |
if (!isInTransaction()) { | |
// Begin a transaction so that execution uses a managed connection | |
TransactionStatus status = transactionManager.getTransaction(definition); | |
transactionStatus.set(status); | |
} | |
} | |
@Override | |
public void end(ExecuteContext context) { | |
decrementNestedExecutions(); | |
if (isNestedExecution() || isApplicationTransaction()) { | |
return; | |
} | |
try { | |
if (isSuccessful(context)) { | |
transactionManager.commit(transactionStatus.get()); | |
} else { | |
rollbackOnException(transactionStatus.get(), context.exception()); | |
} | |
} finally { | |
transactionStatus.remove(); | |
} | |
} | |
/** Perform a rollback, logging if a rollback exception occurs. */ | |
private void rollbackOnException(TransactionStatus status, Throwable throwable) { | |
try { | |
transactionManager.rollback(status); | |
} catch (RuntimeException e) { | |
logger.error("Rollback exception supressed by application exception", e); | |
} | |
} | |
/** Increment the number of executions being performed on the connection. */ | |
private void incrementNestedExecutions() { | |
nestedExecutions.set(nestedExecutions.get() + 1); | |
} | |
/** Decrement the number of executions being performed on the connection. */ | |
private void decrementNestedExecutions() { | |
nestedExecutions.set(nestedExecutions.get() - 1); | |
} | |
/** If there are remaining executions being performed on the connection. */ | |
private boolean isNestedExecution() { | |
return nestedExecutions.get() > 0; | |
} | |
/** If the current thread is in a transactional context. */ | |
private boolean isInTransaction() { | |
return TransactionSynchronizationManager.isSynchronizationActive(); | |
} | |
/** If the transactional context was created by the application or the listener. */ | |
private boolean isApplicationTransaction() { | |
return (transactionStatus.get() == null); | |
} | |
/** If the operation was executed successfully. */ | |
private boolean isSuccessful(ExecuteContext context) { | |
return (context.exception() == null); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment