Created
July 18, 2010 01:29
-
-
Save alphazero/480022 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2010 Joubin Houshyar | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package test.net; | |
import java.io.BufferedReader; | |
import java.io.DataOutputStream; | |
import java.io.InputStreamReader; | |
import java.net.ConnectException; | |
import java.net.Socket; | |
import java.net.SocketTimeoutException; | |
import java.net.UnknownHostException; | |
import java.util.Random; | |
/** | |
* Demo a fault-tolerant connection. | |
* Both dropped connections or server downtime are handled. | |
* <p> | |
* To run the test, set in your redis.conf: | |
* <p> | |
* set the appendonly so that when we crash redis, | |
* the counter values are correctly reflected in the db file. | |
* <pre><code>appendonly yes</code></pre> | |
* <p> | |
* set the timeout to 1 sec to force frequent idle client | |
* timeouts. | |
* <pre><code>timeout 1</code></pre> | |
* | |
* @author joubin ([email protected]) | |
* @date Jul 17, 2010 | |
* | |
*/ | |
public class FaultTolerantConnector { | |
/** Run the test */ | |
public static void main (String[] args) { | |
try { | |
new FaultTolerantConnector().run(); | |
} | |
catch (ConnectException e) { | |
System.err.format("Failed to connect to server: %s\n", e); | |
e.printStackTrace(); | |
} | |
catch (Throwable e) { | |
System.err.format("Fault: %s\n", e); | |
e.printStackTrace(); | |
} | |
} | |
// ======================================================================== | |
String password = "jredis"; // your server's password. null will skip AUTH | |
Socket s = null; | |
DataOutputStream dout = null; | |
BufferedReader din = null; | |
long lastchecktime; | |
boolean isConnected = false; | |
/** | |
* Perpetually runs a INCR sequence. Delay between requests is chosen to be | |
* much bigger than the redis.conf:timeout settings. (Tested with 1 sec). | |
* | |
* @throws Exception | |
*/ | |
public void run() throws Exception { | |
long maxDowntime = 1000 * 60; | |
long delay = 5000; // redis.conf::timeout=1 // 1 second | |
long redismaxidle = 1000; | |
connect(); | |
long n = 0; | |
long reps = 10000; | |
long start = System.currentTimeMillis(); | |
String guidKey = String.format("test:%X", new Random(System.currentTimeMillis()).nextLong()); | |
System.out.format("INCR key: %s\n", guidKey); | |
DEL(guidKey); | |
long cntr = 0; | |
for(;;){ | |
// make sure connection is alive | |
// tolerate up to maxDowntime msecs while Redis comes back up | |
// | |
if(System.currentTimeMillis() - lastchecktime > redismaxidle) | |
assertConnectionEstablished(maxDowntime, false); | |
// do something destructive | |
// | |
try { | |
long rcntr = INCR(guidKey); | |
cntr++; | |
// we'll check our cntr against redis rcntr to see if the faults are introducing errors or not | |
if(cntr != -1 && rcntr != cntr ) | |
throw new IllegalStateException(String.format("Inconsistent state: cntr @ %d and the INCR response of %d", cntr, rcntr)); | |
n++; | |
} | |
catch (FaultOnIO e){ | |
if(e.op == FaultOnIO.Op.W) { | |
// can happen if Redis shuts down | |
// just when connector is about to send the request | |
System.err.format("WARNING: last Op's request raised a connection fault -- assuming Op NOT processed.\n"); | |
} | |
else { | |
// can happen if Redis shuts down | |
// just when its about to send the response | |
System.err.format("WARNING: last Op's response raised a connection fault -- "); | |
if(e.getCause() == null) { | |
System.err.format("assuming Op NOT processed.\n"); | |
} | |
else { | |
System.err.format("assuming Op processed.\n"); | |
// increment counter to test our assumption | |
cntr ++; | |
} | |
} | |
lastchecktime = 0; | |
continue; | |
} | |
if(n > reps) { | |
long delta = System.currentTimeMillis() - start; | |
System.out.format("%d INCRs in %d msecs\n", reps, delta); | |
// delay to force idle client disconnect | |
// (if delay >> redis.conf::timeout) | |
takeANap (delay); | |
start = System.currentTimeMillis(); | |
n = 0; | |
} | |
} | |
} | |
private final void assertConnectionEstablished (long maxDowntime, boolean usePings) throws Exception{ | |
long start = System.currentTimeMillis(); | |
for(;;) { | |
try { | |
if(usePings) | |
testConnectionWithPing(); | |
else | |
testConnection(); | |
lastchecktime = System.currentTimeMillis(); | |
break; | |
} | |
catch (Exception e){ | |
isConnected = false; | |
long downtime = System.currentTimeMillis() - start; | |
if(downtime > maxDowntime){ | |
String errmsg = String.format("Could not re-establish connection -- server down more than %d msecs.", maxDowntime); | |
throw new Exception (errmsg); | |
} | |
} | |
} | |
} | |
private final void takeANap (long delay) { | |
try { | |
// System.out.format("will sleep for %d msecs -- Shut down that Redis now\n", delay); | |
Thread.sleep(delay); | |
} | |
catch (Exception e) { } | |
} | |
/** | |
* Works fine if used in conjunction with a timestamp that keeps track of the | |
* last assert time. If only invoked when the last assert time is greater than | |
* the redis client idel time setting (redis.conf::timeout) the RTT of this op's | |
* is absorbed by all the request made during that idle time window. A balanced | |
* setting of redis.conf timeout parameter would extend the number of these | |
* requests to the point where the cost is effectively negligable. | |
* | |
* @throws Exception | |
*/ | |
public void testConnection() throws Exception{ | |
try { | |
s.setSoTimeout(1); | |
int r = din.read(); | |
if(r ==-1) { | |
if(isConnected){ | |
System.err.format("WARNING: Connection has been dropped\n"); | |
isConnected = false; | |
} | |
else { | |
System.err.format("."); | |
} | |
try { | |
connect(); | |
System.err.format("\n"); | |
} | |
catch (Exception e){ | |
// System.out.format("ERROR: Failed to reconnect -- assuming net/server failure.\n"); | |
throw e; | |
} | |
} | |
} | |
catch (SocketTimeoutException e) {/* good news - will happen if connection is alive */ } | |
catch (Exception e) { | |
// System.out.format("ERROR: Failed to test connection -- assuming net/server failure.\n"); | |
throw e; | |
} | |
finally { s.setSoTimeout(0); } | |
} | |
/** | |
* Works but not entirely reliably. We're paying PING/PONG round trip cost | |
* (~100 microsecs) per request. Much better than the other method but it | |
* seems to still miss dropped connections at times. (Not sure why). | |
* | |
* @throws Exception | |
*/ | |
public void testConnectionWithPing() throws Exception{ | |
try { | |
ping(); | |
} | |
catch (Exception e) { | |
System.out.format("WARNING: Connection has been dropped\n"); | |
try { | |
connect(); | |
} | |
catch (Exception e2){ | |
System.err.format("ERROR: failed to reconnect; %s\n", e2.getMessage()); | |
throw new Exception ("Cannot connect to Redis server", e2); | |
} | |
} | |
} | |
public void connect() throws RuntimeException, UnknownHostException, Exception{ | |
s= new Socket("localhost", 6379); | |
s.setSoTimeout(0); | |
dout = new DataOutputStream(s.getOutputStream()); | |
din = new BufferedReader(new InputStreamReader(s.getInputStream())); | |
if(password != null) | |
AUTH (password); | |
SELECT (13); | |
isConnected = true; | |
System.out.format("INFO: Connection established.\n"); | |
} | |
// ------ client API | |
// a simple redis client impl. | |
// | |
void request(byte[] bytes) throws FaultOnIO { | |
try { | |
dout.write(bytes); | |
} | |
catch (Throwable t){ | |
throw new FaultOnIO(FaultOnIO.Op.W, t); | |
} | |
} | |
String getLineResponse() throws FaultOnIO { | |
String resp = null; | |
try { | |
resp = din.readLine(); | |
} | |
catch (Throwable t){ | |
throw new FaultOnIO(FaultOnIO.Op.R, t); | |
} | |
finally { | |
if(resp == null) throw new FaultOnIO(FaultOnIO.Op.R); | |
} | |
return resp; | |
} | |
public void ping () throws Exception { | |
request ("PING\r\n".getBytes()); | |
@SuppressWarnings("unused") | |
String resp = getLineResponse(); | |
} | |
public long INCR (String key) throws Exception { | |
request (String.format("INCR %s\r\n", key).getBytes()); | |
String resp = getLineResponse(); | |
return Long.parseLong(resp.substring(1)); | |
} | |
public long DEL (String key) throws Exception { | |
request (String.format("DEL %s\r\n", key).getBytes()); | |
String resp = getLineResponse(); | |
return Long.parseLong(resp.substring(1)); | |
} | |
public void AUTH (String password) throws Exception { | |
request(String.format("AUTH %s\r\n", password).getBytes()); | |
@SuppressWarnings("unused") | |
String resp = getLineResponse(); | |
} | |
public void SELECT (int n) throws Exception { | |
request(String.format("SELECT %d\r\n", n).getBytes()); | |
@SuppressWarnings("unused") | |
String resp = getLineResponse(); | |
} | |
public void FLUSH () throws Exception { | |
request(String.format("FLUSHDB\r\n").getBytes()); | |
@SuppressWarnings("unused") | |
String resp = getLineResponse(); | |
} | |
@SuppressWarnings("serial") | |
static class FaultOnIO extends Exception { | |
public enum Op { R, W}; | |
final public Op op; | |
public FaultOnIO (Op op) {super(); this.op = op;} | |
public FaultOnIO (Op op, Throwable cause) {super(cause); this.op = op;} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Added a simple integrity check for the destructive INCR op. Its far more likely to induce a broken pipe on write if the "delay" is above program is set to 0. You should see a few FaultOnIO thrown if you keep pulling the plug on Redis. On redis restart the counter should be at the expected value, confirming the integrity of the fault handling.