Last active
August 26, 2022 07:47
-
-
Save nshaw/5111802 to your computer and use it in GitHub Desktop.
v3. Sorted nodes visible to ClusterExecutor, added default heartbeat value to support LR 6.0
v2. Added more logging to improve readability, removed some logic which was examining packet content, changed duration to be based on configured heartbeat interval
v1. First revision
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.liferay.portal.kernel.cluster.ClusterExecutorUtil | |
import com.liferay.portal.kernel.cluster.ClusterNode | |
import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream | |
import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream | |
import com.liferay.portal.kernel.util.DateUtil | |
import com.liferay.portal.kernel.util.GetterUtil | |
import com.liferay.portal.kernel.util.PropsUtil | |
import com.liferay.portal.kernel.util.StringBundler | |
import com.liferay.util.transport.DatagramHandler | |
import com.liferay.util.transport.MulticastDatagramHandler | |
import com.liferay.util.transport.MulticastTransport | |
import com.liferay.util.transport.Transport | |
import org.apache.commons.logging.Log | |
import org.apache.commons.logging.LogFactory | |
import java.util.concurrent.ExecutorService | |
import java.util.concurrent.Executors | |
import java.util.concurrent.TimeUnit | |
import java.util.concurrent.TimeoutException | |
import java.util.zip.GZIPInputStream | |
//////// | |
//This script does two things when executed through the Server Admin - Script console. | |
// 1) Display the cluster nodes currently known to the ClusterExecutor. | |
// 2) Attach a multicast handler to the control channel and record any nodes sending traffic (e.g. heartbeats) on that channel. | |
// | |
// If you turn up logging to at least info on com.liferay.util.transport, you'll also see each datagram packet logged to the | |
// server logs. | |
/////// | |
try { | |
boolean clusteringEnabled = ClusterExecutorUtil.isEnabled(); | |
println "Clustering enabled: " + clusteringEnabled; | |
if (!clusteringEnabled) { | |
return; | |
} | |
println "-----" | |
List<ClusterNode> nodes = ClusterExecutorUtil.getClusterNodes() | |
int numberOfNodes = nodes.size(); | |
println "Number of nodes registered in ClusterExecutor: " + numberOfNodes | |
List<ClusterNode> sortedNodes = nodes.sort(new Comparator<ClusterNode>() { | |
int compare(ClusterNode o1, ClusterNode o2) { | |
return o1.getHostName().compareTo(o2.getHostName()); | |
} | |
}) | |
for (ClusterNode node : sortedNodes) { | |
println node.getHostName() + "," + | |
node.getInetAddress() + "," + | |
node.getPort() + "," + | |
node.getClusterNodeId() | |
} | |
// Defaults for the heartbeat/control-channel | |
String host = | |
PropsUtil.get("multicast.group.address[\"cluster-link-control\"]") | |
Integer port = GetterUtil.getInteger( | |
PropsUtil.get("multicast.group.port[\"cluster-link-control\"]") | |
); | |
Integer heartbeat = GetterUtil.getInteger( | |
PropsUtil.get("cluster.executor.heartbeat.interval"), | |
5 | |
); | |
Boolean gzipData = false; | |
Boolean shortData = true; | |
// Try a fairly long test duration - four times the heartbeat | |
int heartbeatMultiplier = 4; | |
int testDurationInMilliseconds = heartbeatMultiplier*heartbeat; | |
CustomMulticastDatagramHandler handler = new CustomMulticastDatagramHandler( | |
gzipData.booleanValue(), shortData.booleanValue()); | |
println "-----" | |
println "Created transport against multicast group " + host + ":" + port; | |
CustomMulticastTransport transport = new CustomMulticastTransport( | |
handler, host, port); | |
// if (shortData.booleanValue()) { | |
// println "Truncating to 96 bytes."; | |
// } | |
println "Started up heartbeat listener at " + DateUtil.newDate(); | |
transport.connect(); | |
ExecutorService service = Executors.newSingleThreadExecutor(); | |
try { | |
service.submit(transport).get(testDurationInMilliseconds, | |
TimeUnit.MILLISECONDS); | |
} | |
catch (TimeoutException e) { | |
//no-op, expected when the transport is timed out by the executor | |
} | |
finally { | |
transport.interrupt(); | |
service.shutdown(); | |
println "Shutdown heartbeat listener at " + DateUtil.newDate(); | |
println "-----" | |
} | |
String[] addresses = handler.getAddresses() | |
int numberSeen = addresses.length | |
println "Number of nodes observed through control traffic: " + numberSeen | |
println "Addresses seen: " + addresses | |
println "-----" | |
if ((numberSeen <= 1) || (numberSeen != numberOfNodes)) { | |
println "WARNING: clustering does not appear to be working correctly." | |
} | |
} | |
catch (Exception e) { | |
println "Exception " + e; | |
} | |
public class CustomMulticastDatagramHandler implements DatagramHandler { | |
public CustomMulticastDatagramHandler(boolean gzipData, boolean shortData) { | |
_gzipData = gzipData; | |
_shortData = shortData; | |
} | |
public void errorReceived(Throwable t) { | |
_log.error(t, t); | |
} | |
public void process(DatagramPacket packet) { | |
if (_log.isInfoEnabled()) { | |
byte[] bytes = packet.getData(); | |
if (_gzipData) { | |
try { | |
bytes = getUnzippedBytes(bytes); | |
} | |
catch (Exception e) { | |
_log.error(e, e); | |
} | |
} | |
if (_shortData) { | |
byte[] temp = new byte[96]; | |
System.arraycopy(bytes, 0, temp, 0, 96); | |
bytes = temp; | |
} | |
StringBundler sb = new StringBundler(4); | |
sb.append("["); | |
sb.append(packet.getSocketAddress()); | |
sb.append("] "); | |
sb.append(new String(bytes)); | |
_log.info(sb); | |
} | |
//CUSTOM | |
_socketAddresses.add(packet.getSocketAddress().toString()); | |
//CUSTOM END | |
} | |
//CUSTOM START | |
public String[] getAddresses() { | |
return _socketAddresses.toArray(); | |
} | |
//CUSTOM END | |
protected byte[] getUnzippedBytes(byte[] bytes) throws Exception { | |
InputStream is = new GZIPInputStream( | |
new UnsyncByteArrayInputStream(bytes)); | |
UnsyncByteArrayOutputStream ubaos = new UnsyncByteArrayOutputStream( | |
bytes.length); | |
byte[] buffer = new byte[1500]; | |
int c = 0; | |
while (true) { | |
if (c == -1) { | |
break; | |
} | |
c = is.read(buffer, 0, 1500); | |
if (c != -1) { | |
ubaos.write(buffer, 0, c); | |
} | |
} | |
is.close(); | |
ubaos.flush(); | |
ubaos.close(); | |
return ubaos.toByteArray(); | |
} | |
private static Log _log = LogFactory.getLog(MulticastDatagramHandler.class); | |
private boolean _gzipData; | |
private boolean _shortData; | |
//CUSTOM START | |
private Set<String> _socketAddresses = new TreeSet<String>(); | |
//CUSTOM END | |
} | |
public class CustomMulticastTransport extends Thread implements Transport { | |
public CustomMulticastTransport(DatagramHandler handler, String host, int port) { | |
//Not working in groovy script super("MulticastListener-" + host + port); | |
setDaemon(true); | |
_handler = handler; | |
_host = host; | |
_port = port; | |
} | |
public synchronized void connect() throws IOException { | |
if (_socket == null) { | |
_socket = new MulticastSocket(_port); | |
} | |
else if (_socket.isConnected() && _socket.isBound()) { | |
return; | |
} | |
_address = InetAddress.getByName(_host); | |
_socket.joinGroup(_address); | |
_connected = true; | |
start(); | |
} | |
public synchronized void disconnect() { | |
// Interrupt all processing | |
if (_address != null) { | |
try { | |
_socket.leaveGroup(_address); | |
_address = null; | |
} | |
catch (IOException ioe) { | |
_log.error("Unable to leave group", ioe); | |
} | |
} | |
_connected = false; | |
interrupt(); | |
_socket.close(); | |
} | |
public boolean isConnected() { | |
return _connected; | |
} | |
@Override | |
public void run() { | |
try { | |
while (_connected && !this.isInterrupted()) { | |
_socket.receive(_inboundPacket); | |
_handler.process(_inboundPacket); | |
} | |
} | |
catch (IOException ioe) { | |
_log.error("Unable to process ", ioe); | |
_socket.disconnect(); | |
_connected = false; | |
_handler.errorReceived(ioe); | |
} | |
} | |
public synchronized void sendMessage(byte[] bytes) throws IOException { | |
_outboundPacket.setData(bytes); | |
_outboundPacket.setAddress(_address); | |
_outboundPacket.setPort(_port); | |
_socket.send(_outboundPacket); | |
} | |
public synchronized void sendMessage(String message) throws IOException { | |
sendMessage(message.getBytes()); | |
} | |
private static Log _log = LogFactory.getLog(MulticastTransport.class); | |
private InetAddress _address; | |
private boolean _connected; | |
private DatagramHandler _handler; | |
private String _host; | |
private byte[] _inboundBuffer = new byte[4096]; | |
private DatagramPacket _inboundPacket = new DatagramPacket( | |
_inboundBuffer, _inboundBuffer.length); | |
private byte[] _outboundBuffer = new byte[4096]; | |
private DatagramPacket _outboundPacket = new DatagramPacket( | |
_outboundBuffer, _outboundBuffer.length); | |
private int _port; | |
private MulticastSocket _socket; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment