Skip to content

Instantly share code, notes, and snippets.

@huttj
Last active August 29, 2015 14:24
Show Gist options
  • Save huttj/1d0e3473bbc9a12c2dbc to your computer and use it in GitHub Desktop.
Save huttj/1d0e3473bbc9a12c2dbc to your computer and use it in GitHub Desktop.
RethinkDB union performance example
r.dbCreate('test');
r.db('test').tableCreate('one');
r.db('test').tableCreate('two');
r.db('test').tableCreate('three');
r.range(300).forEach(function(i) {
return r.db('test').table('one').insert({ uid: 'a1' })
});
r.range(300).forEach(function(i) {
return r.db('test').table('two').insert({ uid: 'a1' })
});
r.range(300).forEach(function(i) {
return r.db('test').table('three').insert({ uid: 'a1' })
});
r.range(300).forEach(function(i) {
return r.db('test').table('one').insert({ uid: 'b2' })
});
r.range(300).forEach(function(i) {
return r.db('test').table('two').insert({ uid: 'b2' })
});
r.range(300).forEach(function(i) {
return r.db('test').table('three').insert({ uid: 'b2' })
});
r.range(300).forEach(function(i) {
return r.db('test').table('one').insert({ uid: 'c3' })
});
r.range(300).forEach(function(i) {
return r.db('test').table('two').insert({ uid: 'c3' })
});
r.range(300).forEach(function(i) {
return r.db('test').table('three').insert({ uid: 'c3' })
});
r.range(300).forEach(function(i) {
return r.db('test').table('one').insert({ uid: 'd4' })
});
r.range(300).forEach(function(i) {
return r.db('test').table('two').insert({ uid: 'd4' })
});
r.range(300).forEach(function(i) {
return r.db('test').table('three').insert({ uid: 'd4' })
});
<html>
<head>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/zepto/1.1.4/zepto.min.js"></script>
<script type="text/javascript" src="./socket.io/socket.io.js"></script>
</head>
<body>
<h2 style="display: inline-block">User: </h2>
<select id="user-select">
<option value="a1">a1</option>
<option value="b2">b2</option>
<option value="c3">c3</option>
<option value="d4">d4</option>
</select>
<script type="text/javascript">
var socket;
$(function() {
var user_selection = $('#user-select');
user_selection.on('change', function() {
console.log('selected new user:', this.value);
socket.emit('user', this.value);
});
socket = io.connect();
socket.emit('user', 'a1');
socket.on("console", console.log.bind(console));
socket.on('data', function(data) {
console.log(data);
});
});
</script>
</body>
</html>
{
"name": "rethink-status",
"version": "0.0.1",
"description": "A simple RethinkDB cluster monitoring frontend demo with realtime graphs",
"main": "app.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node app.js"
},
"repository": {
"type": "git",
"url": "https://github.com/rethinkdb/rethink-status.git"
},
"author": "Ryan Paul",
"license": "MIT",
"bugs": {
"url": "https://github.com/rethinkdb/rethink-status/issues"
},
"homepage": "https://github.com/rethinkdb/rethink-status",
"dependencies": {
"bluebird": "^2.9.33",
"express": "^4.11.2",
"rethinkdb": "^1.16.0",
"socket.io": "^1.3.2"
}
}
var Promise = require('bluebird');
var express = require('express');
var socketio = require('socket.io');
var r = require('rethinkdb');
var log = console.log.bind(console);
var app = express();
app.use(express.static(__dirname));
var io = socketio.listen(app.listen(8080), {log: false});
console.log('Server started on port ' + 8080);
var DB = r.connect({db: 'rethinkdb', host: 'test', port: '28015'});
DB.then(function (c) {
// Whether or not to use the faster query
var query = process.argv[2] ? goodUserQuery : slowUserQuery;
log('Ready; using', query.name);
io.on('connection', function onConnection(socket) {
log('client connected', socket.conn.remoteAddress);
socket.on('user', function onUser(userId) {
log('user', userId);
var lastRequestTime = +new Date();
socket.lastRequestTime = lastRequestTime;
cancelSubscription();
closeExistingCursor();
query(userId, 100).run(c)
.then(saveCursor)
.then(toArray)
.then(sendData)
.then(subscribe)
.catch(log);
function saveCursor(c) {
socket.cursor = c;
return c;
}
function sendData(data) {
log('sending', data.length, 'records');
if (socket.lastRequestTime === lastRequestTime) {
socket.emit('data', data);
} else {
throw new Error('Client changed userId before data came back');
}
}
function subscribe() {
query(userId)
.changes().run(c)
.then(sendChangeData);
}
function sendChangeData(cursor) {
log('change data being sent for user ' + userId);
socket.subscription = cursor;
cursor.each(function (err, item) {
socket.emit('data', item);
});
}
});
socket.on('disconnect', function () {
closeExistingCursor();
cancelSubscription();
log('client disconnected', socket.conn.remoteAddress);
});
function closeExistingCursor() {
if (socket.cursor) {
socket.cursor.close().then(log.bind(0, 'Closed cursor'));
}
socket.cursor = null;
}
function cancelSubscription() {
if (socket.subscription) {
socket.subscription.close().then(log.bind(0, 'Canceled subscription'));
}
socket.subscription = null;
}
});
function slowUserQuery(userId, limit) {
var oneQuery = r.db('test').table('one').filter({'uid': userId});
var twoQuery = r.db('test').table('two').filter({'uid': userId});
var threeQuery = r.db('test').table('three').filter({'uid': userId});
if (limit) {
log('beginning userQuery for user ' + userId + ' with limit of ' + limit);
oneQuery = oneQuery.limit(limit);
twoQuery = twoQuery.limit(limit);
threeQuery = threeQuery.limit(limit);
} else {
log('no limit userQuery for user ' + userId);
}
return oneQuery.map(as('one'))
.union(twoQuery.map(as('two')))
.union(threeQuery.map(as('rawdata_sensor')));
function as(s) {
return function(n) {
return {
type: s,
data: n
}
}
}
}
function goodUserQuery(userId, limit) {
var oneQuery = r.db('test').table('one').filter({'uid': userId});
var twoQuery = r.db('test').table('two').filter({'uid': userId});
var threeQuery = r.db('test').table('three').filter({'uid': userId});
if (limit) {
log('beginning userQuery for user ' + userId + ' with limit of ' + limit);
oneQuery = oneQuery.limit(limit);
twoQuery = twoQuery.limit(limit);
threeQuery = threeQuery.limit(limit);
} else {
log('no limit userQuery for user ' + userId);
}
return oneQuery.merge({ table: 'one' })
.union(twoQuery.merge({ table: 'two' }))
.union(threeQuery.merge({ table: 'three' }));
}
function toArray(c) {
return c.toArray();
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment