Skip to content

Instantly share code, notes, and snippets.

@johnou
Created June 22, 2017 09:24
Show Gist options
  • Save johnou/9f19c794a9de82f3c144a80f5f1d304c to your computer and use it in GitHub Desktop.
Save johnou/9f19c794a9de82f3c144a80f5f1d304c to your computer and use it in GitHub Desktop.
import cloud.orbit.actors.cluster.MessageListener;
import cloud.orbit.actors.cluster.NodeAddress;
import cloud.orbit.actors.cluster.NodeAddressImpl;
import cloud.orbit.actors.cluster.ViewListener;
import cloud.orbit.concurrent.Task;
import cloud.orbit.exception.UncheckedException;
import com.sulake.h4k.common.orbit.ExtendedClusterPeer;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.fork.ForkChannel;
import org.jgroups.jmx.JmxConfigurator;
import org.jgroups.protocols.FRAG3;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinTask;
public class JGroupsClusterPeer implements ExtendedClusterPeer {
private static final Logger logger = LoggerFactory.getLogger(JGroupsClusterPeer.class);
private static final String REPLICATED_CONFIGURATION_NAME = "replicatedAsyncCache";
private Task<Address> startFuture;
private ForkChannel channel;
private JChannel baseChannel;
private DefaultCacheManager cacheManager;
private NodeInfo local;
private volatile Map<Address, NodeInfo> nodeMap = new HashMap<>();
private volatile Map<NodeAddress, NodeInfo> nodeMap2 = new HashMap<>();
private ViewListener viewListener;
private MessageListener messageListener;
private String jgroupsConfig = "classpath:/conf/jgroups.xml";
public JGroupsClusterPeer(String jgroupsConfig) {
this.jgroupsConfig = jgroupsConfig;
}
@Override
public NodeAddress localAddress() {
sync();
return local.nodeAddress;
}
@Override
public void registerViewListener(final ViewListener viewListener) {
this.viewListener = viewListener;
}
@Override
public void registerMessageReceiver(final MessageListener messageListener) {
this.messageListener = messageListener;
}
static final class NodeInfo {
private Address address;
private NodeAddress nodeAddress;
NodeInfo(final Address address) {
this.address = address;
final UUID jgroupsUUID = (UUID) address;
this.nodeAddress = new NodeAddressImpl(new java.util.UUID(jgroupsUUID.getMostSignificantBits(), jgroupsUUID.getLeastSignificantBits()));
}
}
public Task<?> join(final String clusterName, final String nodeName) {
final ForkJoinTask<Address> f = ForkJoinTask.adapt(new Callable<Address>() {
@Override
public Address call() {
try {
if (System.getProperty("java.net.preferIPv4Stack", null) == null) {
System.setProperty("java.net.preferIPv4Stack", "true");
}
// the parameter of this constructor defines the protocol stack
// we are using the default that allows discovery based on broadcast packets.
// It must be asserted that the production network support (enables) this.
// Otherwise it's also possible to change the discovery mechanism.
baseChannel = new JChannel(configToURL(jgroupsConfig));
baseChannel.setName(nodeName);
channel = new ForkChannel(baseChannel,
"hijack-stack",
"lead-hijacker",
true,
ProtocolStack.Position.ABOVE,
FRAG3.class);
channel.setReceiver(new ReceiverAdapter() {
@Override
public void viewAccepted(final View view) {
doViewAccepted(view);
}
@Override
public void receive(final Message msg) {
doReceive(msg);
}
});
JmxConfigurator.registerChannel(baseChannel, ManagementFactory.getPlatformMBeanServer(), clusterName);
final GlobalConfigurationBuilder globalConfigurationBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
globalConfigurationBuilder.globalJmxStatistics().allowDuplicateDomains(true);
globalConfigurationBuilder.transport().clusterName(clusterName).nodeName(nodeName).transport(new JGroupsTransport(baseChannel));
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.clustering().cacheMode(CacheMode.DIST_ASYNC);
cacheManager = new DefaultCacheManager(globalConfigurationBuilder.build(), builder.build());
ConfigurationBuilder builder2 = new ConfigurationBuilder();
builder2.clustering().cacheMode(CacheMode.REPL_ASYNC);
cacheManager.defineConfiguration(REPLICATED_CONFIGURATION_NAME, builder2.build());
// need to get a cache, any cache to force the initialization
cacheManager.getCache("distributedDirectory");
channel.connect(clusterName);
local = new NodeInfo(channel.getAddress());
logger.info("Registering the local address");
logger.info("Done with JGroups initialization");
return local.address;
} catch (final Exception e) {
logger.error("Error during JGroups initialization", e);
throw new UncheckedException(e);
}
}
});
startFuture = Task.fromFuture(f);
f.fork();
return startFuture;
}
private URL configToURL(final String jgroupsConfig) throws MalformedURLException {
if (jgroupsConfig.startsWith("classpath:")) {
// classpath resource
final String resourcePath = jgroupsConfig.substring("classpath:".length());
final URL resource = getClass().getResource(resourcePath);
if (resource == null) {
throw new IllegalArgumentException("Can't find classpath resource: " + resourcePath);
}
return resource;
}
if (!jgroupsConfig.contains(":")) {
// normal file
return Paths.get(jgroupsConfig).toUri().toURL();
}
return new URL(jgroupsConfig);
}
@Override
public void leave() {
channel.close();
channel = null;
cacheManager.stop();
baseChannel.close();
baseChannel = null;
}
// ensures that the channel is connected
private void sync() {
if (startFuture != null && !startFuture.isDone()) {
startFuture.join();
}
}
private void doViewAccepted(final View view) {
final LinkedHashMap<Address, NodeInfo> newNodes = new LinkedHashMap<>(view.size());
final LinkedHashMap<NodeAddress, NodeInfo> newNodes2 = new LinkedHashMap<>(view.size());
for (final Address a : view) {
NodeInfo info = nodeMap.get(a);
if (info == null) {
info = new NodeInfo(a);
}
newNodes.put(a, info);
newNodes2.put(info.nodeAddress, info);
}
nodeMap = Collections.unmodifiableMap(newNodes);
nodeMap2 = Collections.unmodifiableMap(newNodes2);
viewListener.onViewChange(nodeMap2.keySet());
}
@SuppressWarnings("PMD.AvoidThrowingNullPointerException")
@Override
public void sendMessage(NodeAddress address, byte message[]) {
try {
Objects.requireNonNull(address, "node address");
final NodeInfo node = nodeMap2.get(address);
if (node == null) {
throw new IllegalArgumentException("Cluster node not found: " + address);
}
ForkChannel channel = this.channel;
if (channel == null || !channel.isOpen()) {
throw new IllegalStateException("Cluster not connected");
}
channel.send(node.address, message);
} catch (Exception e) {
throw new UncheckedException(e);
}
}
@Override
@SuppressWarnings("unchecked")
public <K, V> ConcurrentMap<K, V> getCache(final String name) {
return cacheManager.getCache(name);
}
@SuppressWarnings("unchecked")
public <K, V> ConcurrentMap<K, V> getReplicatedCache(final String name) {
return cacheManager.getCache(name, REPLICATED_CONFIGURATION_NAME);
}
private void doReceive(final Message msg) {
final NodeInfo nodeInfo = nodeMap.get(msg.getSrc());
if (nodeInfo == null) {
logger.warn("Received message from invalid address {}", msg.getSrc());
messageListener.receive(new NodeAddressImpl(new java.util.UUID(((UUID) msg.getSrc()).getMostSignificantBits(), ((UUID) msg.getSrc()).getLeastSignificantBits())), msg.getBuffer());
} else {
messageListener.receive(nodeInfo.nodeAddress, msg.getBuffer());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment