Created
June 22, 2017 09:24
-
-
Save johnou/9f19c794a9de82f3c144a80f5f1d304c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cloud.orbit.actors.cluster.MessageListener; | |
import cloud.orbit.actors.cluster.NodeAddress; | |
import cloud.orbit.actors.cluster.NodeAddressImpl; | |
import cloud.orbit.actors.cluster.ViewListener; | |
import cloud.orbit.concurrent.Task; | |
import cloud.orbit.exception.UncheckedException; | |
import com.sulake.h4k.common.orbit.ExtendedClusterPeer; | |
import org.infinispan.configuration.cache.CacheMode; | |
import org.infinispan.configuration.cache.ConfigurationBuilder; | |
import org.infinispan.configuration.global.GlobalConfigurationBuilder; | |
import org.infinispan.manager.DefaultCacheManager; | |
import org.infinispan.remoting.transport.jgroups.JGroupsTransport; | |
import org.jgroups.Address; | |
import org.jgroups.JChannel; | |
import org.jgroups.Message; | |
import org.jgroups.ReceiverAdapter; | |
import org.jgroups.View; | |
import org.jgroups.fork.ForkChannel; | |
import org.jgroups.jmx.JmxConfigurator; | |
import org.jgroups.protocols.FRAG3; | |
import org.jgroups.stack.ProtocolStack; | |
import org.jgroups.util.UUID; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.lang.management.ManagementFactory; | |
import java.net.MalformedURLException; | |
import java.net.URL; | |
import java.nio.file.Paths; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.LinkedHashMap; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.ForkJoinTask; | |
public class JGroupsClusterPeer implements ExtendedClusterPeer { | |
private static final Logger logger = LoggerFactory.getLogger(JGroupsClusterPeer.class); | |
private static final String REPLICATED_CONFIGURATION_NAME = "replicatedAsyncCache"; | |
private Task<Address> startFuture; | |
private ForkChannel channel; | |
private JChannel baseChannel; | |
private DefaultCacheManager cacheManager; | |
private NodeInfo local; | |
private volatile Map<Address, NodeInfo> nodeMap = new HashMap<>(); | |
private volatile Map<NodeAddress, NodeInfo> nodeMap2 = new HashMap<>(); | |
private ViewListener viewListener; | |
private MessageListener messageListener; | |
private String jgroupsConfig = "classpath:/conf/jgroups.xml"; | |
public JGroupsClusterPeer(String jgroupsConfig) { | |
this.jgroupsConfig = jgroupsConfig; | |
} | |
@Override | |
public NodeAddress localAddress() { | |
sync(); | |
return local.nodeAddress; | |
} | |
@Override | |
public void registerViewListener(final ViewListener viewListener) { | |
this.viewListener = viewListener; | |
} | |
@Override | |
public void registerMessageReceiver(final MessageListener messageListener) { | |
this.messageListener = messageListener; | |
} | |
static final class NodeInfo { | |
private Address address; | |
private NodeAddress nodeAddress; | |
NodeInfo(final Address address) { | |
this.address = address; | |
final UUID jgroupsUUID = (UUID) address; | |
this.nodeAddress = new NodeAddressImpl(new java.util.UUID(jgroupsUUID.getMostSignificantBits(), jgroupsUUID.getLeastSignificantBits())); | |
} | |
} | |
public Task<?> join(final String clusterName, final String nodeName) { | |
final ForkJoinTask<Address> f = ForkJoinTask.adapt(new Callable<Address>() { | |
@Override | |
public Address call() { | |
try { | |
if (System.getProperty("java.net.preferIPv4Stack", null) == null) { | |
System.setProperty("java.net.preferIPv4Stack", "true"); | |
} | |
// the parameter of this constructor defines the protocol stack | |
// we are using the default that allows discovery based on broadcast packets. | |
// It must be asserted that the production network support (enables) this. | |
// Otherwise it's also possible to change the discovery mechanism. | |
baseChannel = new JChannel(configToURL(jgroupsConfig)); | |
baseChannel.setName(nodeName); | |
channel = new ForkChannel(baseChannel, | |
"hijack-stack", | |
"lead-hijacker", | |
true, | |
ProtocolStack.Position.ABOVE, | |
FRAG3.class); | |
channel.setReceiver(new ReceiverAdapter() { | |
@Override | |
public void viewAccepted(final View view) { | |
doViewAccepted(view); | |
} | |
@Override | |
public void receive(final Message msg) { | |
doReceive(msg); | |
} | |
}); | |
JmxConfigurator.registerChannel(baseChannel, ManagementFactory.getPlatformMBeanServer(), clusterName); | |
final GlobalConfigurationBuilder globalConfigurationBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder(); | |
globalConfigurationBuilder.globalJmxStatistics().allowDuplicateDomains(true); | |
globalConfigurationBuilder.transport().clusterName(clusterName).nodeName(nodeName).transport(new JGroupsTransport(baseChannel)); | |
ConfigurationBuilder builder = new ConfigurationBuilder(); | |
builder.clustering().cacheMode(CacheMode.DIST_ASYNC); | |
cacheManager = new DefaultCacheManager(globalConfigurationBuilder.build(), builder.build()); | |
ConfigurationBuilder builder2 = new ConfigurationBuilder(); | |
builder2.clustering().cacheMode(CacheMode.REPL_ASYNC); | |
cacheManager.defineConfiguration(REPLICATED_CONFIGURATION_NAME, builder2.build()); | |
// need to get a cache, any cache to force the initialization | |
cacheManager.getCache("distributedDirectory"); | |
channel.connect(clusterName); | |
local = new NodeInfo(channel.getAddress()); | |
logger.info("Registering the local address"); | |
logger.info("Done with JGroups initialization"); | |
return local.address; | |
} catch (final Exception e) { | |
logger.error("Error during JGroups initialization", e); | |
throw new UncheckedException(e); | |
} | |
} | |
}); | |
startFuture = Task.fromFuture(f); | |
f.fork(); | |
return startFuture; | |
} | |
private URL configToURL(final String jgroupsConfig) throws MalformedURLException { | |
if (jgroupsConfig.startsWith("classpath:")) { | |
// classpath resource | |
final String resourcePath = jgroupsConfig.substring("classpath:".length()); | |
final URL resource = getClass().getResource(resourcePath); | |
if (resource == null) { | |
throw new IllegalArgumentException("Can't find classpath resource: " + resourcePath); | |
} | |
return resource; | |
} | |
if (!jgroupsConfig.contains(":")) { | |
// normal file | |
return Paths.get(jgroupsConfig).toUri().toURL(); | |
} | |
return new URL(jgroupsConfig); | |
} | |
@Override | |
public void leave() { | |
channel.close(); | |
channel = null; | |
cacheManager.stop(); | |
baseChannel.close(); | |
baseChannel = null; | |
} | |
// ensures that the channel is connected | |
private void sync() { | |
if (startFuture != null && !startFuture.isDone()) { | |
startFuture.join(); | |
} | |
} | |
private void doViewAccepted(final View view) { | |
final LinkedHashMap<Address, NodeInfo> newNodes = new LinkedHashMap<>(view.size()); | |
final LinkedHashMap<NodeAddress, NodeInfo> newNodes2 = new LinkedHashMap<>(view.size()); | |
for (final Address a : view) { | |
NodeInfo info = nodeMap.get(a); | |
if (info == null) { | |
info = new NodeInfo(a); | |
} | |
newNodes.put(a, info); | |
newNodes2.put(info.nodeAddress, info); | |
} | |
nodeMap = Collections.unmodifiableMap(newNodes); | |
nodeMap2 = Collections.unmodifiableMap(newNodes2); | |
viewListener.onViewChange(nodeMap2.keySet()); | |
} | |
@SuppressWarnings("PMD.AvoidThrowingNullPointerException") | |
@Override | |
public void sendMessage(NodeAddress address, byte message[]) { | |
try { | |
Objects.requireNonNull(address, "node address"); | |
final NodeInfo node = nodeMap2.get(address); | |
if (node == null) { | |
throw new IllegalArgumentException("Cluster node not found: " + address); | |
} | |
ForkChannel channel = this.channel; | |
if (channel == null || !channel.isOpen()) { | |
throw new IllegalStateException("Cluster not connected"); | |
} | |
channel.send(node.address, message); | |
} catch (Exception e) { | |
throw new UncheckedException(e); | |
} | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public <K, V> ConcurrentMap<K, V> getCache(final String name) { | |
return cacheManager.getCache(name); | |
} | |
@SuppressWarnings("unchecked") | |
public <K, V> ConcurrentMap<K, V> getReplicatedCache(final String name) { | |
return cacheManager.getCache(name, REPLICATED_CONFIGURATION_NAME); | |
} | |
private void doReceive(final Message msg) { | |
final NodeInfo nodeInfo = nodeMap.get(msg.getSrc()); | |
if (nodeInfo == null) { | |
logger.warn("Received message from invalid address {}", msg.getSrc()); | |
messageListener.receive(new NodeAddressImpl(new java.util.UUID(((UUID) msg.getSrc()).getMostSignificantBits(), ((UUID) msg.getSrc()).getLeastSignificantBits())), msg.getBuffer()); | |
} else { | |
messageListener.receive(nodeInfo.nodeAddress, msg.getBuffer()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment