Skip to content

Instantly share code, notes, and snippets.

@nirbhayc
Created July 15, 2016 21:17
Show Gist options
  • Save nirbhayc/6864b4c772235459455a6b7cea34aa21 to your computer and use it in GitHub Desktop.
Save nirbhayc/6864b4c772235459455a6b7cea34aa21 to your computer and use it in GitHub Desktop.
package syncWrites;
/**
* Check Galera discrepancies in sync writes (parameter wsrep_sync_wait)
*
* Run:
* scp Java/bin/syncWrite/* [email protected]:syncWrite/Java/syncWrite; ssh [email protected] '(cd syncWrite/Java; java -cp ".:mysql-connector-java-5.1.39-bin.jar" syncWrite.simplified --pass *** --db test --nodes 10.9.202.189,100.64.6.114,10.9.197.46 --sw 7)'
*/
import java.text.SimpleDateFormat;
import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class simplified {
static class Cfg {
String user = "root";
String pass;
String dbName;
String[] nodes;
int delay = 0;
String wsrep_sync_wait = "0";
String pk = "1";
}
static class NodeState {
String addr;
Connection conn;
int maxTsDiff = 0;
int maxValDiff = -1; // negative, to print stats on the first cycle
NodeState(String addr) { this.addr = addr; }
@Override
public String toString() {
return "NodeState [addr=" + addr + ", conn=" + conn
+ ", maxTsDiff=" + maxTsDiff + ", maxValDiff=" + maxValDiff
+ "]";
}
}
static String tsPrefix() { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss: ").format(new Date()); }
static void printlnRaw(String s) { System.out.println(s); }
static void printfRaw(String fmt, Object...args) { System.out.printf(fmt, args); }
static void println(String s) { System.out.print(tsPrefix()); System.out.println(s); }
static void printf(String fmt, Object...args) { System.out.printf(fmt, args); }
static Cfg cfg;
static NodeState[] nodeStates;
static volatile int latestVal = -1;
static NodeState sourceNode = null;
public static void main(String[] args) throws SQLException, InterruptedException {
System.out.println("++ Single Threaded ++");
simplified.cfg = parseArgs(args);
if( simplified.cfg == null )
return;
simplified.nodeStates = new NodeState[cfg.nodes.length];
for(int k=0; k < cfg.nodes.length; k++)
nodeStates[k] = new NodeState(cfg.nodes[k]);
sourceNode = nodeStates[0];
System.out.println("Source Node: " + sourceNode.toString());
while (true) {
increment(nodeStates[0], cfg.pk);
for(int i = 1; i<cfg.nodes.length; i++) {
queryAndReport(nodeStates[i], simplified.cfg.pk);
}
}
}
static Connection getConnection(String nodeAddr) throws SQLException {
String url = "jdbc:mysql://" + nodeAddr + "/" + simplified.cfg.dbName;
System.out.println("Getting connection from " + url);
Connection conn = DriverManager.getConnection(url, simplified.cfg.user, simplified.cfg.pass);
conn.setAutoCommit(true);
conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
Statement stmt = conn.createStatement();
stmt.execute("set session wsrep_sync_wait="+String.valueOf(cfg.wsrep_sync_wait));
// stmt.execute("set session wsrep_causal_reads="+String.valueOf(cfg.wsrep_sync_wait));
stmt.close();
return conn;
}
static synchronized int getLatestValue() {
return simplified.latestVal;
}
static synchronized int readFromSource(NodeState nodeState) throws SQLException {
if (nodeState.conn == null)
nodeState.conn = getConnection(nodeState.addr);
Statement stmt = nodeState.conn.createStatement();
ResultSet rs = stmt.executeQuery("select val from snc where pk=1");
rs.next();
int newval = rs.getInt(1);
rs.close();
stmt.close();
return newval;
}
static synchronized void setLatestValue(int newvalue) {
simplified.latestVal = newvalue;
}
static int increment(NodeState nodeState, String pk) throws SQLException {
if (nodeState.conn == null)
nodeState.conn = getConnection(nodeState.addr);
Statement stmt = nodeState.conn.createStatement();
int updatedval = getLatestValue() + 1;
stmt.execute("update snc set val=" + updatedval
+ ", ts=default where pk=" + pk);
nodeState.conn.commit();
ResultSet rs = stmt.executeQuery("select val from snc where pk=1;");
rs.next();
int newval = rs.getInt(1);
setLatestValue(newval);
rs.close();
stmt.close();
return newval;
}
static void queryAndReport(NodeState nodeState, String pk) throws SQLException {
try {
if( nodeState.conn == null )
nodeState.conn = getConnection(nodeState.addr);
Statement stmt = nodeState.conn.createStatement();
// Checking the session variable wsrep_sync_wait before actually making a query.
ResultSet rs1 = stmt.executeQuery("select @@session.wsrep_sync_wait;");
String syncVariable = "";
while(rs1.next()) {
syncVariable = rs1.getString(1);
}
if(syncVariable.trim().equals("0")) {
System.err.println("Session variable 'wsrep_sync_wait' is NOT set to 1");
}
rs1.close();
rs1 = stmt.executeQuery("select @@session.wsrep_causal_reads;");
while(rs1.next()) {
syncVariable = rs1.getString(1);
}
if(syncVariable.trim().equals("0")) {
System.err.println("Session variable 'wsrep_causal_reads' is NOT ON");
}
rs1.close();
// Read from source first
int valFromSource = readFromSource(sourceNode);
ResultSet rs = stmt.executeQuery("select TIMESTAMPDIFF(SECOND, ts, NOW()), val from snc where pk=1");
rs.next();
int tsDiff = rs.getInt(1);
int currentVal = rs.getInt(2);
int valDiff = valFromSource - currentVal;
rs.close();
rs = stmt.executeQuery(
"select lower(variable_name) as name, variable_value as val "+
"from information_schema.session_status "+
"where variable_name in('WSREP_LOCAL_RECV_QUEUE', 'WSREP_LOCAL_SEND_QUEUE', 'WSREP_LOCAL_REPLAYS', "+
"'WSREP_LOCAL_RECV_QUEUE_MAX', 'WSREP_LOCAL_SEND_QUEUE_MAX', 'WSREP_FLOW_CONTROL_PAUSED') "+
"order by variable_name");
Map<String, Integer> sesStat = new HashMap<String, Integer>(10);
while(rs.next()) {
String name = rs.getString(1);
int val = name.equals("wsrep_flow_control_paused") ? Math.round(rs.getFloat(2) * 1000000) : rs.getInt(2);
sesStat.put(name, val);
}
rs.close();
// Report
nodeState.maxTsDiff = Math.max(nodeState.maxTsDiff, tsDiff);
if( valDiff > 0 ) {
nodeState.maxValDiff = Math.max(nodeState.maxValDiff, valDiff);
printf("%15s: val diff: %3d, src: %d, curr: %d, max ts diff %7.3f sec, recvQ max: %3d, sendQ max: %2d, pause %1.6f\n",
nodeState.addr,
valDiff,
valFromSource,
currentVal,
nodeState.maxTsDiff / 1000000f,
sesStat.get("wsrep_local_recv_queue_max"),
sesStat.get("wsrep_local_send_queue_max"),
sesStat.get("wsrep_flow_control_paused") / 1000000f
);
System.exit(1);
}
} catch(SQLException e) {
printf("++ exception on %s: %s\n", nodeState.addr, e.getMessage());
if( nodeState.conn != null ) {
nodeState.conn.close();
nodeState.conn = null;
}
}
}
static Cfg parseArgs(String[] args) {
Cfg cfg = new Cfg();
boolean help = false, err = false;
for(int k=0; k < args.length; k++) {
if( args[k].equals("-h") || args[k].equals("--help") ) { help = true; }
else if( args[k].equals("-s") || args[k].equals("--sleep") ) { cfg.delay = Integer.parseInt(args[++k]); }
else if( args[k].equals("-n") || args[k].equals("--nodes") ) { cfg.nodes = args[++k].split(","); }
else if( args[k].equals("-b") || args[k].equals("--db") ) { cfg.dbName = args[++k]; }
else if( args[k].equals("-u") || args[k].equals("--user") ) { cfg.user = args[++k]; }
else if( args[k].equals("-p") || args[k].equals("--pass") ) { cfg.pass = args[++k]; }
else if( args[k].equals("-w") || args[k].equals("--sw") ) { cfg.wsrep_sync_wait = args[++k]; }
else if( args[k].equals("-k") || args[k].equals("--pk") ) { cfg.pk = args[++k]; }
else {
printfRaw("Unknown parameter %s\n", args[k]);
err = true;
}
}
if( help ) {
printfRaw("Sync write tester (simplified)\n");
printfRaw("Params:\n");
printfRaw(" -h, --help: this help\n");
printfRaw(" -n, --nodes: comma-separated list of cluster node hostnames or ips, for ex.: node1,node2,node3\n");
printfRaw(" -d, --db: database name, mandatory\n");
printfRaw(" -u, --user: user name, default is %s\n", cfg.user);
printfRaw(" -p, --pass: passord, mandatory\n");
printfRaw(" -s, --sleep: sleep delay between cycles in milliseconds, default is %d\n", cfg.delay);
printfRaw(" -w, --sw: wsrep_sync_wait parameter, 0 to 7, default is %s\n", cfg.wsrep_sync_wait);
printfRaw(" -k, --pk: PK of the record to update/query, default is %s\n", cfg.pk);
return null;
}
if( !err ) {
if( cfg.dbName == null || cfg.dbName.length() == 0 ) { printfRaw("Mandatory parameter --db is missing\n"); err = true; }
if( cfg.pass == null || cfg.pass.length() == 0 ) { printfRaw("Mandatory parameter --pass is missing\n"); err = true; }
if( cfg.nodes == null || cfg.nodes.length == 0 ) { printfRaw("Mandatory parameter --nodes is missing\n"); err = true; }
if( cfg.wsrep_sync_wait.compareTo("0") < 0 || cfg.wsrep_sync_wait.compareTo("7") > 0 ) { printfRaw("Bad --sw value, expecting 0 to 7\n"); err = true; }
}
return err ? null : cfg;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment