Last active
April 4, 2018 17:25
-
-
Save davengeo/3a3cadbd950a4eb41874fd51d6fe18d3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Slf4j | |
@RunWith(SpringRunner.class) | |
@SpringBootTest | |
public class DemoApplicationTests { | |
private static final String NAMESPACE = "distributedlog://127.0.0.1:2181/messaging/distributedlog/testing"; | |
@Autowired | |
JtaTransactionManager jtaTransactionManager; | |
private DistributedLogManager managerOne, managerTwo; | |
@Before | |
public void setup() throws IOException { | |
final Namespace namespace = Factories.namespace(NAMESPACE).get(); | |
this.managerOne = namespace.openLog("stream-1"); | |
this.managerTwo = namespace.openLog("stream-2"); | |
managerOne.delete(); | |
managerTwo.delete(); | |
} | |
@Test | |
public void transactionInDlogHappyPath() throws SystemException, IOException, NotSupportedException { | |
final LogWriter logWriterOne = managerOne.openLogWriter(); | |
final LogWriter logWriterTwo = managerTwo.openLogWriter(); | |
final Transaction transaction = jtaTransactionManager.createTransaction("test1", 300); | |
final int txId = 1; | |
commitTransaction(logWriterOne, logWriterTwo, transaction, txId); | |
final LogRecordWithDLSN recordOne = managerOne.openLogReader(txId).readNext(false); | |
final LogRecordWithDLSN controlOne = managerOne.openLogReader(txId + 1).readNext(false); | |
assertThat(CustomLogRecord.isCommit(controlOne)).isTrue(); | |
final LogRecordWithDLSN recordTwo = managerTwo.openLogReader(txId).readNext(false); | |
final LogRecordWithDLSN controlTwo = managerOne.openLogReader(txId + 1).readNext(false); | |
assertThat(CustomLogRecord.isCommit(controlTwo)).isTrue(); | |
assertThat(recordOne.getPayload()).isEqualTo("ONE".getBytes()); | |
assertThat(recordTwo.getPayload()).isEqualTo("TWO".getBytes()); | |
assertThat(Try.of(managerOne::getLogRecordCount).get()).isEqualTo(2L); | |
assertThat(Try.of(managerTwo::getLogRecordCount).get()).isEqualTo(2L); | |
log.warn("managerOne {} records", managerOne.getLogRecordCount()); | |
log.warn("managerTwo {} records", managerTwo.getLogRecordCount()); | |
} | |
private void commitTransaction(LogWriter logWriterOne, LogWriter logWriterTwo, Transaction transaction, int txId) throws IOException { | |
try { | |
transaction.enlistResource(new DLogXAResource(logWriterOne, txId, ByteBuffer.wrap("ONE".getBytes()))); | |
transaction.enlistResource(new DLogXAResource(logWriterTwo, txId, ByteBuffer.wrap("TWO".getBytes()))); | |
transaction.commit(); | |
} catch (Exception e) { | |
log.error("Found error in tx", e); | |
} finally { | |
logWriterOne.close(); | |
logWriterTwo.close(); | |
} | |
} | |
@Test | |
public void transactionInDlogFailingInPrepAndRollback() throws Exception { | |
final LogWriter logWriterOne = managerOne.openLogWriter(); | |
final LogWriter logWriterTwo = Mockito.spy(managerTwo.openLogWriter()); | |
final Transaction transaction = jtaTransactionManager.createTransaction("test-to-abort-one", 300); | |
// writerTwo always throws exception (even in the rollback) | |
doThrow(IOException.class).when(logWriterTwo).write(any(LogRecord.class)); | |
final int txId = 1; | |
commitTransaction(logWriterOne, logWriterTwo, transaction, txId); | |
final LogRecordWithDLSN recordOne = managerOne.openLogReader(txId).readNext(false); | |
final LogRecordWithDLSN controlOne = managerOne.openLogReader(txId + 1).readNext(false); | |
assertThat(recordOne.getPayload()).isEqualTo("ONE".getBytes()); | |
assertThat(CustomLogRecord.isAbort(controlOne)).isTrue(); | |
assertThat(managerTwo.getLogRecordCount()).isZero(); | |
log.warn("managerOne {} records", managerOne.getLogRecordCount()); | |
log.warn("managerTwo {} records", managerTwo.getLogRecordCount()); | |
} | |
@Test | |
public void transactionInDlogFailingOnlyInPrep() throws Exception { | |
final LogWriter logWriterOne = managerOne.openLogWriter(); | |
final LogWriter logWriterTwo = Mockito.spy(managerTwo.openLogWriter()); | |
final Transaction transaction = jtaTransactionManager.createTransaction("test-to-abort-two", 300); | |
// writerTwo only throws exception in prep (it will work in the rollback) | |
doThrow(IOException.class).doCallRealMethod().when(logWriterTwo).write(any(LogRecord.class)); | |
final int txId = 1; | |
commitTransaction(logWriterOne, logWriterTwo, transaction, txId); | |
final LogRecordWithDLSN recordOne = managerOne.openLogReader(txId).readNext(false); | |
final LogRecordWithDLSN controlOne = managerOne.openLogReader(txId + 1).readNext(false); | |
assertThat(recordOne.getPayload()).isEqualTo("ONE".getBytes()); | |
assertThat(CustomLogRecord.isAbort(controlOne)).isTrue(); | |
final LogRecordWithDLSN controlTwo = managerTwo.openLogReader(txId).readNext(false); | |
assertThat(controlTwo.getTransactionId()).isNotEqualTo(txId).isEqualTo(txId + 1); | |
assertThat(CustomLogRecord.isAbort(controlTwo)).isTrue(); | |
assertThat(managerTwo.getLogRecordCount()).isOne(); | |
log.warn("managerOne {} records", managerOne.getLogRecordCount()); | |
log.warn("managerTwo {} records", managerTwo.getLogRecordCount()); | |
} | |
@TestConfiguration | |
public class MyConfig { | |
@Bean | |
public UserTransactionManager atomikosTransactionManager() { | |
final UserTransactionManager userTransactionManager = new UserTransactionManager(); | |
userTransactionManager.setForceShutdown(false); | |
return userTransactionManager; | |
} | |
@Bean | |
public UserTransactionImp atomikosUserTransaction() { | |
return new UserTransactionImp(); | |
} | |
@Bean | |
public JtaTransactionManager jtaTransactionManager(UserTransactionImp atomikosUserTransaction, | |
UserTransactionManager atomikosTransactionManager) { | |
final JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(); | |
jtaTransactionManager.setUserTransaction(atomikosUserTransaction); | |
jtaTransactionManager.setTransactionManager(atomikosTransactionManager); | |
return jtaTransactionManager; | |
} | |
} | |
} | |
@Slf4j | |
@SuppressWarnings("RedundantThrows") | |
class DLogXAResource implements XAResource { | |
private final LogWriter writer; | |
private final long txId; | |
private final ByteBuffer content; | |
private int timeout = 300; | |
private Xid xid; | |
DLogXAResource(LogWriter writer, long txId, ByteBuffer content) { | |
this.writer = writer; | |
this.txId = txId; | |
this.content = content; | |
} | |
@Override | |
public void start(Xid xid, int flags) throws XAException { | |
log.info("starting {} {}", xid, flags); | |
if (XAResource.XA_OK == flags) { | |
this.xid = xid; | |
} | |
} | |
@Override | |
public int prepare(Xid xid) throws XAException { | |
log.info("preparing {}", xid); | |
LogRecord record = new LogRecord(txId, content.array()); | |
try { | |
writer.write(record); | |
writer.commit(); | |
} catch (IOException e) { | |
log.error("Error in preparing", e); | |
throw new XAException(XA_RBCOMMFAIL); | |
} | |
return XA_OK; | |
} | |
@Override | |
public Xid[] recover(int i) throws XAException { | |
log.info("recovering {}", i); | |
if (Objects.isNull(xid)) { | |
return new Xid[0]; | |
} | |
return new Xid[]{xid}; | |
} | |
@Override | |
public void rollback(Xid xid) throws XAException { | |
log.info("rolling back {}", xid); | |
CustomLogRecord control = new CustomLogRecord(txId + 1, "".getBytes()); | |
control.setAbortFlag(); | |
writeRecord(control, XA_HEURRB); | |
log.info("end of rollback of {}", xid); | |
} | |
@Override | |
public void commit(Xid xid, boolean onePhase) throws XAException { | |
log.info("committing tx {} {}", xid, onePhase); | |
CustomLogRecord control = new CustomLogRecord(txId + 1, "".getBytes()); | |
control.setCommitFlag(); | |
writeRecord(control, XA_RBCOMMFAIL); | |
} | |
private void writeRecord(CustomLogRecord control, int status) throws XAException { | |
try { | |
writer.write(control); | |
writer.commit(); | |
} catch (IOException e) { | |
log.error("Found error", e); | |
throw new XAException(status); | |
} | |
} | |
@Override | |
public void end(Xid xid, int flags) throws XAException { | |
log.info("ending tx {} {}", xid, flags); | |
if ((flags & TMSUCCESS) != 0) { | |
log.info("{} Tx SUCCESS", xid); | |
} else if ((flags & TMFAIL) != 0) { | |
log.warn("{} Tx FAILED", xid); | |
} else if ((flags & TMSUSPEND) != 0) { | |
log.warn("{} Tx SUSPEND", xid); | |
} else { | |
log.warn("{} ended with WTF {} status", xid, flags); | |
} | |
} | |
@Override | |
public void forget(Xid xid) throws XAException { | |
log.info("forgetting tx {}", xid); | |
try { | |
writer.abort(); | |
} catch (IOException e) { | |
log.error("Error in recovery of {}:{}", xid, e.getMessage()); | |
throw new XAException(XAER_RMERR); | |
} | |
} | |
@Override | |
public int getTransactionTimeout() throws XAException { | |
return this.timeout; | |
} | |
@Override | |
public boolean isSameRM(XAResource xaResource) throws XAException { | |
return xaResource.equals(this); | |
} | |
@Override | |
public boolean setTransactionTimeout(int timeout) throws XAException { | |
log.info("set transaction timeout to {}", timeout); | |
this.timeout = timeout; | |
return true; | |
} | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) return true; | |
if (o == null || getClass() != o.getClass()) return false; | |
DLogXAResource that = (DLogXAResource) o; | |
return txId == that.txId && | |
timeout == that.timeout && | |
Objects.equals(content, that.content) && | |
Objects.equals(xid, that.xid); | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hash(txId, content, timeout, xid); | |
} | |
} | |
@SuppressWarnings({"unused", "ConstantConditions", "WeakerAccess"}) | |
class CustomLogRecord extends LogRecord { | |
private static final long LOGRECORD_FLAGS_COMMIT_MESSAGE = 8L; | |
private static final long LOGRECORD_FLAGS_ABORT_MESSAGE = 16L; | |
private static Field METADATA; | |
static { | |
METADATA = ReflectionUtils.findField(LogRecord.class, "metadata"); | |
METADATA.setAccessible(true); | |
} | |
public CustomLogRecord(long l, byte[] bytes) { | |
super(l, bytes); | |
} | |
public void setCommitFlag() { | |
this.setMetadata(this.getMetadata() | LOGRECORD_FLAGS_COMMIT_MESSAGE); | |
} | |
public void setAbortFlag() { | |
this.setMetadata(this.getMetadata() | LOGRECORD_FLAGS_ABORT_MESSAGE); | |
} | |
public static boolean isCommit(LogRecord incoming) { | |
final Long metadata = (Long) ReflectionUtils.getField(METADATA, incoming); | |
return (metadata & LOGRECORD_FLAGS_COMMIT_MESSAGE) != 0; | |
} | |
public static boolean isAbort(LogRecord incoming) { | |
final Long metadata = (Long) ReflectionUtils.getField(METADATA, incoming); | |
return (metadata & LOGRECORD_FLAGS_ABORT_MESSAGE) != 0; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment