Skip to content

Instantly share code, notes, and snippets.

@stevenschlansker
Last active December 8, 2016 00:26
Show Gist options
  • Save stevenschlansker/eed4cb6297e882e3c5862425c9ebe2a2 to your computer and use it in GitHub Desktop.
Save stevenschlansker/eed4cb6297e882e3c5862425c9ebe2a2 to your computer and use it in GitHub Desktop.
Hi jgroups-users,
I am developing my first JGroups 4.0.0b3 based project.
It is a relatively simple replicated state machine.
Each state transition is sent as a Message.
Members may join or leave the cluster at any time.
My hope is that I can end up in a situation where no matter what sequence
of member join / leave / crash (barring a total outage where all members die)
the state machine will be reliably replicated.
Right now I am trying to understand how to get the state transfer working.
My initial pass at writing the code went well, but now that I deployed it
out into the real world, I am starting to think I don't fully understand
how to get this right.
My stack is as follows:
channel = new JChannel(
new TCP(),
new TCPPING(),
new FLUSH(), // Inhibit message delivery during topology changes
new MERGE3(), // Detect and merge partitions together
new FD_ALL2(), // Detect failures of peer nodes
new BARRIER(), // State transfer also inhibits messages
new NAKACK2().setUseMcastXmit(false), // Reliable delivery; Multicast doesn't work in AWS
new UNICAST3(), // Reliable delivery (required for STABLE)
new STABLE(), // Garbage collect old messages
new GMS(), // Maintain cluster membership
new SEQUENCER(), // Create a total ordering over all messages
new STATE()); // New members get existing state
channel.setName(id);
channel.setReceiver(this);
channel.connect("discovery", null, 10000);
A simplified version of my Receiver:
@Override
public void receive(MessageBatch batch) {
batch.forEach(this::receive);
}
@Override
public void receive(Message msg) {
AnnouncementUpdate update;
try {
update = mapper.readValue(msg.getRawBuffer(), AnnouncementUpdate.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
state.update(update); // apply transition to state machine
LOG.info("innerReceive {} {}", state.index, update);
}
@Override
public void getState(OutputStream output) throws Exception {
new ObjectMapper().writeValue(output, state); // Jackson JSON serialization
}
@Override
public void setState(InputStream input) throws Exception {
state = new ObjectMapper().readValue(input, State.class);
LOG.info("RECEIVED STATE @{}", state.index);
}
Now, I am observing that the different members of the cluster are not encountering messages the
way I expect them. I expect that when a member joins, it should first do the TCPPING discovery,
then update its views / cluster membership with GMS, use FLUSH and BARRIER to inhibit all messages,
transfer the STATE, then finally resume FLUSH and BARRIER to normal operation.
Specifically, I expect that a new member will observe zero messages before STATE transfer, and
that the set and ordering of messages the new member sees after STATE transfer is exactly the
same as the existing member observes after it sends its STATE.
But I am seeing messages delivered *before and during* STATE transfer!
Consider this log output:
https://gist.githubusercontent.com/stevenschlansker/195b6f02761768f205e2af4de218ff3f/raw/fe5733b9e2243a0bce917f17845729712d36a40f/gistfile1.txt
You can see that between the channel.connect() call and when MyReceiver.setState finishes,
~10 messages are passed through to MyReceiver.receive
This destroys my state machine, since rules are now applied multiple times or concurrently with a STATE transfer,
the servers never correctly synchronize to a consistent state.
Suspiciously, I see FLUSH reporting that it receives STOP_FLUSH -- but there was never a corresponding START_FLUSH message or the like.
I can't help but feel I'm missing something obvious. Full TRACE logs (which are very large) available by request.
Thanks for any and all advice!
Best,
Steven
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment