Created
February 3, 2017 22:54
-
-
Save mclark-newvistas/74a0a960e3e6d4b35455580b2acfc0fb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var r = require('rethinkdb'); | |
// you'd have to pass in an open rethink and deepstream connection, but that's trivial code to write | |
// rewrite of deepstream.io-provider-search-rethinkdb | |
module.exports = function(conn, client) { | |
var searches = new Map(); | |
var pk = 'ds_id'; | |
function getRow(path) { | |
// cribbed from official release | |
var parts = path.split(/[\[\]\.]/g).filter(val => val.trim().length > 0); | |
return parts.slice(1).reduce((row, part) => row(part), r.row(parts[0])); | |
} | |
function searchProvider(name, subscribed, response) { | |
if (!subscribed) { | |
if (!searches.has(name)) | |
return console.error("Unexpected unsubscribe for", name); | |
searches.get(name)[0].delete(); | |
searches.get(name)[1].close(err => { | |
if (err) return console.error("search close error", err); | |
}); | |
searches.delete(name); | |
return; | |
} | |
try { | |
var search = JSON.parse(name.substr(7)); | |
} catch (e) { | |
return console.error("JSON parse failed:", e); | |
} | |
if (!search.table) | |
return console.error('Missing parameter "table"'); | |
if (!search.order != !search.limit) // XOR | |
return console.error('Must specify both "order" and "limit" together'); | |
search.query = search.query || []; | |
var query = r.table(search.table); | |
// orderBy with and index must precede filters; must use an index for changefeeds | |
// TODO: should we check for index / add index if doesn't exist? | |
if (search.order) | |
query = query.orderBy({ index: r[search.desc ? 'desc' : 'asc'](search.order) }); | |
var operators = new Set(['eq', 'match', 'gt', 'ge', 'lt', 'le', 'ne', 'in']); | |
for (var i = 0, il = search.query.length; i < il; i++) { | |
var condition = search.query[i]; | |
if (condition.length !== 3) | |
return console.error("bad condition"); | |
var path = condition[0]; | |
var operator = condition[1]; | |
var value = condition[2]; | |
if (!operators.has(operator)) | |
return console.error("bad operator"); | |
if (operator === 'in' && !Array.isArray(value)) | |
return console.error("'in' operator requires a JSON array"); | |
var predicate; | |
if (operator !== 'in') { | |
predicate = getRow(path)[operator](value); | |
} else { | |
// doesn't take advantage of indexes, possible enhancement | |
// see https://www.rethinkdb.com/docs/sql-to-reql/javascript/ | |
// TODO: path can be a path, not just a field | |
predicate = record => r.expr(value).contains(record(path)); | |
} | |
query = query.filter(predicate); | |
} | |
query = query(pk); | |
if (search.limit) | |
query = query.limit(search.limit); | |
// we have a probably valid query | |
query.changes({ | |
includeStates: true, includeInitial: true, | |
}).run(conn).then(cursor => { | |
response.accept(); | |
searches.set(name, [client.record.getList(name), cursor]); | |
var ids = new Set(); | |
function add(id) { | |
if (ids) return ids.add(id); | |
searches.get(name)[0].addEntry(id); | |
} | |
function remove(id) { | |
if (ids) return ids.delete(id); | |
searches.get(name)[0].removeEntry(id); | |
} | |
function replace(from, to) { | |
var values = ids; | |
if (!values) | |
values = searches.get(name)[0].getEntries(); | |
values = values.filter(value => value !== from); | |
values.push(to); | |
if (!ids) | |
searches.get(name)[0].setEntries(values); | |
} | |
function ready() { | |
searches.get(name)[0].setEntries(Array.from(ids)); | |
ids = undefined; | |
} | |
cursor.each((err, row) => { | |
if (err) { | |
if (err.message !== 'Cursor is closed.' || searches.has(name)) | |
console.error("Unexpected cursor error:", err); | |
return; | |
} | |
// we already unsubscribed; we're done. Cursor also closed already above. | |
if (!searches.get(name)) | |
return false; | |
if (row.state === 'ready') | |
return ready(); | |
if (row.new_val && row.old_val) | |
replace(row.old_val, row.new_val); | |
else if (row.new_val) | |
add(row.new_val); | |
else if (row.old_val) | |
remove(row.old_val); | |
}); | |
}).catch(e => { | |
response.reject(); | |
console.error("query failed:", e); | |
}); | |
} | |
client.record.listen("search\?", searchProvider); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment