Created
August 7, 2015 23:18
-
-
Save allanwax/6cb5385737e671f76eb9 to your computer and use it in GitHub Desktop.
Scan a redis cluster in parallel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.findology.util.jediscluster; | |
import org.apache.log4j.ConsoleAppender; | |
import org.apache.log4j.Level; | |
import org.apache.log4j.Logger; | |
import redis.clients.jedis.*; | |
import java.net.InetAddress; | |
import java.util.*; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* Created by allan.wax on 7/27/2015. | |
*/ | |
public abstract class ClusterScanner { | |
private static final Logger log = Logger.getLogger(ClusterScanner.class); | |
protected static final ExecutorService executor = Executors.newCachedThreadPool(); | |
protected static final HashMap<String, String> hostMap = new HashMap<>(); | |
private JedisCluster jedisCluster = null; | |
private static boolean TEST_MODE = false; | |
public ClusterScanner(JedisCluster jedisCluster) { | |
this.jedisCluster = jedisCluster; | |
} | |
public abstract void action(String key); | |
private String normalizeHostAndPort(String hostAndPort) { | |
try { | |
String hap = hostMap.get(hostAndPort); | |
if (hap == null) { | |
int colon = hostAndPort.lastIndexOf(':'); | |
InetAddress address = InetAddress.getByName(hostAndPort.substring(0, colon)); | |
hap = address.getHostAddress() + hostAndPort.substring(colon); | |
hostMap.put(hostAndPort, hap); | |
} | |
return hap; | |
} | |
catch (Exception e) { | |
return hostAndPort; | |
} | |
} | |
private class ScannerImpl implements Callable<Long> { | |
private String clusterInstance; | |
private String match; | |
public ScannerImpl(String clusterInstance, String match) { | |
this.clusterInstance = clusterInstance; | |
this.match = match; | |
} | |
/** | |
* Computes a result, or throws an exception if unable to do so. | |
* | |
* @return computed result | |
* @throws Exception if unable to compute a result | |
*/ | |
@Override | |
public Long call() throws Exception { | |
long count = 0; /* TEST_MODE */ | |
int scanCount = 0; | |
try { | |
String hostAndPort = normalizeHostAndPort(clusterInstance); | |
String[] parts = hostAndPort.split(":"); | |
String host = parts[0]; | |
int port = Integer.valueOf(parts[1]); | |
try (Jedis jedis = new Jedis(host, port)) { | |
ScanParams params = new ScanParams().match(match).count(100); | |
String scanMarker = "0"; | |
ScanResult<String> results = null; | |
do { | |
results = jedis.scan(scanMarker, params); | |
scanCount++; | |
List<String> keys = results.getResult(); | |
if (keys != null && keys.size() > 0) { | |
count += keys.size(); /* TEST_MODE */ | |
for (String key : keys) { | |
action(key); | |
} | |
} | |
scanMarker = results.getStringCursor(); | |
} while (!scanMarker.equals("0")); | |
} | |
if (TEST_MODE) { | |
log.info("Found " + count + " keys for " + hostAndPort + " in " + scanCount + " scans"); /* TEST_MODE */ | |
} | |
} | |
catch (Exception e) { | |
log.error("" + e); | |
} | |
return count; | |
} | |
} | |
public void scan(String match) { | |
scan(match, 15); | |
} | |
public void scan(String match, int maxSeconds) { | |
if (TEST_MODE) { | |
log.info("Start scan for '" + match + "'"); | |
} | |
Map<String, JedisPool> jedisPools = jedisCluster.getClusterNodes(); | |
String nodeList = null; | |
Exception screwed = null; | |
// get the list of nodes (masters and slaves) | |
for (JedisPool pool : jedisPools.values()) { | |
try { | |
Jedis j = null; | |
try { | |
j = pool.getResource(); | |
nodeList = j.clusterNodes(); | |
} | |
catch (Exception e1) { | |
screwed = e1; | |
j.close(); | |
j = null; | |
continue; | |
} | |
finally { | |
if (j != null) { | |
j.close(); | |
break; | |
} | |
} | |
} | |
catch (Exception e) { | |
// DO SOMETHING | |
} | |
} | |
if (nodeList == null) { | |
log.error("The cluster is screwed. Can't use any members.", screwed); | |
return; | |
} | |
String[] nodes = nodeList.split("\n"); | |
ArrayList<ScannerImpl> scanners = new ArrayList<>(); | |
// pick out the masters | |
for (String node : nodes) { | |
String[] info = node.split("\\s+"); | |
if (info[2].indexOf("fail") >= 0) { | |
continue; | |
} | |
if (info[2].indexOf("handshake") >= 0) { | |
continue; | |
} | |
if (info[2].indexOf("master") >= 0) { | |
scanners.add(new ScannerImpl(info[1], match)); | |
} | |
} | |
if (!scanners.isEmpty()) { | |
try { | |
executor.invokeAll(scanners, maxSeconds, TimeUnit.SECONDS); | |
} | |
catch (Exception e) { | |
log.error(e); | |
} | |
} | |
} | |
/* TEST */ | |
public static void main(String[] args) { | |
TEST_MODE = true; | |
Logger rootLogger = Logger.getRootLogger(); | |
if (!rootLogger.getAllAppenders().hasMoreElements()) { | |
rootLogger.setLevel(Level.INFO); | |
rootLogger.addAppender(new ConsoleAppender(new org.apache.log4j.PatternLayout("%d{yyyy-MM-dd HH:mm:ss.SSS} [%t; %C{1}] %-5p -- %m%n"))); | |
} | |
Set<HostAndPort> jcNodes = new HashSet<>(); | |
jcNodes.add(new HostAndPort("test2", 17000)); | |
final JedisCluster jc = new JedisCluster(jcNodes); | |
final AtomicInteger ai = new AtomicInteger(0); | |
ClusterScanner scanner = new ClusterScanner(jc) { | |
@Override | |
public void action(String key) { | |
ai.incrementAndGet(); | |
} | |
}; | |
String scanFor = "*"; | |
long start = System.currentTimeMillis(); | |
scanner.scan(scanFor); | |
log.info("Scan for '" + scanFor + "' found " + ai + " keys in " + ((System.currentTimeMillis() - start) / 1000.0) + " seconds"); | |
System.exit(0); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment