Skip to content

Instantly share code, notes, and snippets.

@davengeo
Last active April 4, 2018 17:25
Show Gist options
  • Save davengeo/3a3cadbd950a4eb41874fd51d6fe18d3 to your computer and use it in GitHub Desktop.
Save davengeo/3a3cadbd950a4eb41874fd51d6fe18d3 to your computer and use it in GitHub Desktop.
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
private static final String NAMESPACE = "distributedlog://127.0.0.1:2181/messaging/distributedlog/testing";
@Autowired
JtaTransactionManager jtaTransactionManager;
private DistributedLogManager managerOne, managerTwo;
@Before
public void setup() throws IOException {
final Namespace namespace = Factories.namespace(NAMESPACE).get();
this.managerOne = namespace.openLog("stream-1");
this.managerTwo = namespace.openLog("stream-2");
managerOne.delete();
managerTwo.delete();
}
@Test
public void transactionInDlogHappyPath() throws SystemException, IOException, NotSupportedException {
final LogWriter logWriterOne = managerOne.openLogWriter();
final LogWriter logWriterTwo = managerTwo.openLogWriter();
final Transaction transaction = jtaTransactionManager.createTransaction("test1", 300);
final int txId = 1;
commitTransaction(logWriterOne, logWriterTwo, transaction, txId);
final LogRecordWithDLSN recordOne = managerOne.openLogReader(txId).readNext(false);
final LogRecordWithDLSN controlOne = managerOne.openLogReader(txId + 1).readNext(false);
assertThat(CustomLogRecord.isCommit(controlOne)).isTrue();
final LogRecordWithDLSN recordTwo = managerTwo.openLogReader(txId).readNext(false);
final LogRecordWithDLSN controlTwo = managerOne.openLogReader(txId + 1).readNext(false);
assertThat(CustomLogRecord.isCommit(controlTwo)).isTrue();
assertThat(recordOne.getPayload()).isEqualTo("ONE".getBytes());
assertThat(recordTwo.getPayload()).isEqualTo("TWO".getBytes());
assertThat(Try.of(managerOne::getLogRecordCount).get()).isEqualTo(2L);
assertThat(Try.of(managerTwo::getLogRecordCount).get()).isEqualTo(2L);
log.warn("managerOne {} records", managerOne.getLogRecordCount());
log.warn("managerTwo {} records", managerTwo.getLogRecordCount());
}
private void commitTransaction(LogWriter logWriterOne, LogWriter logWriterTwo, Transaction transaction, int txId) throws IOException {
try {
transaction.enlistResource(new DLogXAResource(logWriterOne, txId, ByteBuffer.wrap("ONE".getBytes())));
transaction.enlistResource(new DLogXAResource(logWriterTwo, txId, ByteBuffer.wrap("TWO".getBytes())));
transaction.commit();
} catch (Exception e) {
log.error("Found error in tx", e);
} finally {
logWriterOne.close();
logWriterTwo.close();
}
}
@Test
public void transactionInDlogFailingInPrepAndRollback() throws Exception {
final LogWriter logWriterOne = managerOne.openLogWriter();
final LogWriter logWriterTwo = Mockito.spy(managerTwo.openLogWriter());
final Transaction transaction = jtaTransactionManager.createTransaction("test-to-abort-one", 300);
// writerTwo always throws exception (even in the rollback)
doThrow(IOException.class).when(logWriterTwo).write(any(LogRecord.class));
final int txId = 1;
commitTransaction(logWriterOne, logWriterTwo, transaction, txId);
final LogRecordWithDLSN recordOne = managerOne.openLogReader(txId).readNext(false);
final LogRecordWithDLSN controlOne = managerOne.openLogReader(txId + 1).readNext(false);
assertThat(recordOne.getPayload()).isEqualTo("ONE".getBytes());
assertThat(CustomLogRecord.isAbort(controlOne)).isTrue();
assertThat(managerTwo.getLogRecordCount()).isZero();
log.warn("managerOne {} records", managerOne.getLogRecordCount());
log.warn("managerTwo {} records", managerTwo.getLogRecordCount());
}
@Test
public void transactionInDlogFailingOnlyInPrep() throws Exception {
final LogWriter logWriterOne = managerOne.openLogWriter();
final LogWriter logWriterTwo = Mockito.spy(managerTwo.openLogWriter());
final Transaction transaction = jtaTransactionManager.createTransaction("test-to-abort-two", 300);
// writerTwo only throws exception in prep (it will work in the rollback)
doThrow(IOException.class).doCallRealMethod().when(logWriterTwo).write(any(LogRecord.class));
final int txId = 1;
commitTransaction(logWriterOne, logWriterTwo, transaction, txId);
final LogRecordWithDLSN recordOne = managerOne.openLogReader(txId).readNext(false);
final LogRecordWithDLSN controlOne = managerOne.openLogReader(txId + 1).readNext(false);
assertThat(recordOne.getPayload()).isEqualTo("ONE".getBytes());
assertThat(CustomLogRecord.isAbort(controlOne)).isTrue();
final LogRecordWithDLSN controlTwo = managerTwo.openLogReader(txId).readNext(false);
assertThat(controlTwo.getTransactionId()).isNotEqualTo(txId).isEqualTo(txId + 1);
assertThat(CustomLogRecord.isAbort(controlTwo)).isTrue();
assertThat(managerTwo.getLogRecordCount()).isOne();
log.warn("managerOne {} records", managerOne.getLogRecordCount());
log.warn("managerTwo {} records", managerTwo.getLogRecordCount());
}
@TestConfiguration
public class MyConfig {
@Bean
public UserTransactionManager atomikosTransactionManager() {
final UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
@Bean
public UserTransactionImp atomikosUserTransaction() {
return new UserTransactionImp();
}
@Bean
public JtaTransactionManager jtaTransactionManager(UserTransactionImp atomikosUserTransaction,
UserTransactionManager atomikosTransactionManager) {
final JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
jtaTransactionManager.setUserTransaction(atomikosUserTransaction);
jtaTransactionManager.setTransactionManager(atomikosTransactionManager);
return jtaTransactionManager;
}
}
}
@Slf4j
@SuppressWarnings("RedundantThrows")
class DLogXAResource implements XAResource {
private final LogWriter writer;
private final long txId;
private final ByteBuffer content;
private int timeout = 300;
private Xid xid;
DLogXAResource(LogWriter writer, long txId, ByteBuffer content) {
this.writer = writer;
this.txId = txId;
this.content = content;
}
@Override
public void start(Xid xid, int flags) throws XAException {
log.info("starting {} {}", xid, flags);
if (XAResource.XA_OK == flags) {
this.xid = xid;
}
}
@Override
public int prepare(Xid xid) throws XAException {
log.info("preparing {}", xid);
LogRecord record = new LogRecord(txId, content.array());
try {
writer.write(record);
writer.commit();
} catch (IOException e) {
log.error("Error in preparing", e);
throw new XAException(XA_RBCOMMFAIL);
}
return XA_OK;
}
@Override
public Xid[] recover(int i) throws XAException {
log.info("recovering {}", i);
if (Objects.isNull(xid)) {
return new Xid[0];
}
return new Xid[]{xid};
}
@Override
public void rollback(Xid xid) throws XAException {
log.info("rolling back {}", xid);
CustomLogRecord control = new CustomLogRecord(txId + 1, "".getBytes());
control.setAbortFlag();
writeRecord(control, XA_HEURRB);
log.info("end of rollback of {}", xid);
}
@Override
public void commit(Xid xid, boolean onePhase) throws XAException {
log.info("committing tx {} {}", xid, onePhase);
CustomLogRecord control = new CustomLogRecord(txId + 1, "".getBytes());
control.setCommitFlag();
writeRecord(control, XA_RBCOMMFAIL);
}
private void writeRecord(CustomLogRecord control, int status) throws XAException {
try {
writer.write(control);
writer.commit();
} catch (IOException e) {
log.error("Found error", e);
throw new XAException(status);
}
}
@Override
public void end(Xid xid, int flags) throws XAException {
log.info("ending tx {} {}", xid, flags);
if ((flags & TMSUCCESS) != 0) {
log.info("{} Tx SUCCESS", xid);
} else if ((flags & TMFAIL) != 0) {
log.warn("{} Tx FAILED", xid);
} else if ((flags & TMSUSPEND) != 0) {
log.warn("{} Tx SUSPEND", xid);
} else {
log.warn("{} ended with WTF {} status", xid, flags);
}
}
@Override
public void forget(Xid xid) throws XAException {
log.info("forgetting tx {}", xid);
try {
writer.abort();
} catch (IOException e) {
log.error("Error in recovery of {}:{}", xid, e.getMessage());
throw new XAException(XAER_RMERR);
}
}
@Override
public int getTransactionTimeout() throws XAException {
return this.timeout;
}
@Override
public boolean isSameRM(XAResource xaResource) throws XAException {
return xaResource.equals(this);
}
@Override
public boolean setTransactionTimeout(int timeout) throws XAException {
log.info("set transaction timeout to {}", timeout);
this.timeout = timeout;
return true;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DLogXAResource that = (DLogXAResource) o;
return txId == that.txId &&
timeout == that.timeout &&
Objects.equals(content, that.content) &&
Objects.equals(xid, that.xid);
}
@Override
public int hashCode() {
return Objects.hash(txId, content, timeout, xid);
}
}
@SuppressWarnings({"unused", "ConstantConditions", "WeakerAccess"})
class CustomLogRecord extends LogRecord {
private static final long LOGRECORD_FLAGS_COMMIT_MESSAGE = 8L;
private static final long LOGRECORD_FLAGS_ABORT_MESSAGE = 16L;
private static Field METADATA;
static {
METADATA = ReflectionUtils.findField(LogRecord.class, "metadata");
METADATA.setAccessible(true);
}
public CustomLogRecord(long l, byte[] bytes) {
super(l, bytes);
}
public void setCommitFlag() {
this.setMetadata(this.getMetadata() | LOGRECORD_FLAGS_COMMIT_MESSAGE);
}
public void setAbortFlag() {
this.setMetadata(this.getMetadata() | LOGRECORD_FLAGS_ABORT_MESSAGE);
}
public static boolean isCommit(LogRecord incoming) {
final Long metadata = (Long) ReflectionUtils.getField(METADATA, incoming);
return (metadata & LOGRECORD_FLAGS_COMMIT_MESSAGE) != 0;
}
public static boolean isAbort(LogRecord incoming) {
final Long metadata = (Long) ReflectionUtils.getField(METADATA, incoming);
return (metadata & LOGRECORD_FLAGS_ABORT_MESSAGE) != 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment