Created
November 18, 2019 22:26
-
-
Save Frando/85a6735e99e037ec28eddf82969daa0c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is an example for a sparse source for kappa5 pubsub | |
const Bitfield = require('bitfield-db') | |
const nanoiterator = require('nanoiterator') | |
const ram = require('random-access-memory') | |
module.exports = function createIndexer (handlers, opts) { | |
return new SparseIndexer(handlers, opts) | |
} | |
class SparseIndexer { | |
constructor (handlers, opts) { | |
this.handlers = handlers | |
this.feed = opts.feed | |
this.indexed = new Bitfield(ram()) | |
this.live = typeof opts.live === 'undefined' ? true : opts.live | |
if (!opts.onquery) throw new Error('onquery opt is required.') | |
this._onquery = opts.onquery | |
// Register query extension. | |
this._registerExtension() | |
} | |
subscribe (topic) { | |
this._ext.broadcast({ | |
type: 'query', | |
topic | |
}) | |
} | |
open (done) { | |
this.feed.ready(() => { | |
this.id = this.feed.key.toString('hex') | |
// Attach update handlers. | |
this.feed.on('append', this.handlers.onupdate) | |
this.feed.on('download', this.handlers.onupdate) | |
if (this.live) { | |
// This is live mode in sparse mode. | |
this.feed.once('remote-update', peer => { | |
this.feed.download({ start: peer.remoteLength }) | |
}) | |
} | |
// Download latest block so we know length. | |
this.feed.update(done) | |
}) | |
} | |
pull (state, done) { | |
const self = this | |
const feed = this.feed | |
const has = this.feed.bitfield | |
const indexed = this.indexed | |
const ite = nanoiterator({ next: iterate }) | |
let i = -1 | |
collect(ite, 20, (seqs) => { | |
if (seqs.length) index(seqs) | |
else done() | |
}) | |
// This iterates over the full bitfield of the hypercore | |
// (starting at 0) and for each 1 (= available seq) it | |
// checks the indexed bitfield if this seq was already | |
// indexed. It does this, starting at 0, everytime | |
// the pull function is invoked. This is of course | |
// quite inefficient, especially for long feeds that | |
// are mostly indexed. However I did not yet find | |
// a way to make this more efficient without | |
// recreating the hypercore bitfield. Maybe the latter | |
// is what's needed, but not sure yet. Best would be | |
// if there'd be a fast way to XOR two bitfields. | |
function iterate (cb) { | |
let have = false | |
while (have === false) { | |
i++ | |
if (i === feed.length) return cb(null) | |
have = has.get(i) | |
} | |
self.indexed.has(i, (err, exists) => { | |
if (!exists) cb(i) | |
else iterate(cb) | |
}) | |
} | |
function index (seqs, next) { | |
// Sort seqs into consecutive batches because | |
// hypercore.getBatch is more efficient. | |
const batches = intoBatches(seqs) | |
const opts = { wait: false } | |
// Now get all batches and combine the results. | |
let pending = batches.length | |
let results = [] | |
batches.forEach(batch => { | |
const { start, end } = batch | |
feed.getBatch(start, end, opts, (err, res) => onbatch(err, res, start)) | |
}) | |
function onbatch (err, res, start) { | |
res = res.map((value, i) => { | |
return { | |
key: feed.key.toString('hex'), | |
seq: start + i, | |
value | |
} | |
}) | |
results = results.concat(res) | |
if (--pending === 0) finish(null, results, seqs) | |
} | |
} | |
// Now add the indexed seqs to our indexed bitfield | |
// and pass on the messages. | |
function finish (err, results, seqs) { | |
for (const seq of seqs) { | |
indexed.add(seq) | |
} | |
indexed.flush(() => { | |
indexed.rank(feed.length, (err, count) => { | |
let workLeft = count < has.total() | |
done(null, results, workLeft) | |
}) | |
}) | |
} | |
} | |
_registerExtension (queryCallback) { | |
const name = '__sparse-indexer-query' | |
this._ext = this.feed.registerExtension(name, { | |
encoding: 'json', | |
onmessage: this._onExtensionMessage.bind(this) | |
}) | |
} | |
_onExtensionMessage (message, peer) { | |
if (message.type === 'query') { | |
this._onquery(message.topic, (err, feeds) => { | |
if (err) return | |
const res = { | |
type: 'response', | |
topic: message.topic, | |
feeds | |
} | |
this._ext.send(res, peer) | |
}) | |
} | |
if (message.type === 'response') { | |
let { feeds } = message | |
// TODO: This is where the "multifeed" part would come in. | |
// Add missing feeds etc. | |
if (feeds[this.id]) { | |
let missingSeqs = feeds[this.id].filter(seq => !this.feed.has(seq)) | |
const batches = intoBatches(missingSeqs) | |
batches.forEach(({ start, end }) => { | |
this.feed.download({ start, end }) | |
}) | |
} | |
} | |
} | |
} | |
function collect (ite, max, cb) { | |
let buf = [] | |
ite.next(onnext) | |
function onnext (seq) { | |
if (seq === null) return cb(buf) | |
else buf.push(seq) | |
if (buf.length < max) ite.next(onnext) | |
else cb(buf) | |
} | |
} | |
function intoBatches (seqs) { | |
const batches = [] | |
let start = seqs[0] | |
let end = start + 1 | |
for (let i = 0; i < seqs.length; i++) { | |
let seq = seqs[i] | |
if (seq > end) { | |
batches.push({ start, end }) | |
start = seq | |
} | |
end = seq + 1 | |
} | |
batches.push({ start, end }) | |
return batches | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
❯ npm run test | |
> [email protected] test /home/bit/Code/node/kappa-core | |
> tape test/*.js | |
TAP version 13 | |
# sparse | |
First replication done | |
---------------------- | |
local | |
Key: 9182f3.. | |
Writable: true | |
Length: 101 | |
Downloaded: 101 | |
Topics: red: 26 blue: 25 green: 25 yellow: 25 | |
remote | |
Key: 9182f3.. | |
Writable: false | |
Length: 101 | |
Downloaded: 0 | |
Topics: | |
Queried for "red" | |
----------------- | |
local | |
Key: 9182f3.. | |
Writable: true | |
Length: 101 | |
Downloaded: 101 | |
Topics: red: 26 blue: 25 green: 25 yellow: 25 | |
remote | |
Key: 9182f3.. | |
Writable: false | |
Length: 101 | |
Downloaded: 26 | |
Topics: red: 26 | |
Appended two reds one blue | |
-------------------------- | |
local | |
Key: 9182f3.. | |
Writable: true | |
Length: 104 | |
Downloaded: 104 | |
Topics: red: 28 blue: 26 green: 25 yellow: 25 | |
remote | |
Key: 9182f3.. | |
Writable: false | |
Length: 104 | |
Downloaded: 29 | |
Topics: red: 28 blue: 1 | |
1..0 | |
# tests 0 | |
# pass 0 | |
# ok |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const tape = require('tape') | |
const hypercore = require('hypercore') | |
const ram = require('random-access-memory') | |
const { runAll, replicate } = require('./lib/util') | |
const { Kappa } = require('kappa-core') | |
const createIndexer = require('./hypercore-sparse') | |
const topics = ['red', 'blue', 'green', 'yellow'] | |
tape.only('sparse', async t => { | |
var local, remote | |
runAll([ | |
// Create two feeds and a kappa for each. | |
cb => { | |
local = init() | |
local.feed.ready(cb) | |
}, | |
cb => { | |
remote = init(local.feed.key) | |
remote.feed.ready(cb) | |
}, | |
// Fill the local feed with some data. | |
cb => { | |
const batch = [] | |
for (let i = 0; i <= 100; i++) { | |
batch[i] = { topic: topics[i % 4], i } | |
} | |
local.feed.append(batch, cb) | |
}, | |
// Replicate the two feeds. | |
cb => replicate(local.feed, remote.feed, cb), | |
cb => logAll(cb, 'First replication done'), | |
// Make a query | |
cb => { | |
remote.kappa.api.topics.remoteQuery('red') | |
setImmediate(cb) | |
}, | |
cb => logAll(cb, 'Queried for "red"'), | |
// Add some more data | |
cb => { | |
local.feed.append([ | |
{ topic: 'red' }, | |
{ topic: 'blue' }, | |
{ topic: 'red' } | |
], cb) | |
}, | |
cb => setImmediate(cb), | |
cb => logAll(cb, 'Appended two reds one blue'), | |
// End | |
cb => t.end() | |
]) | |
function logAll (cb, msg) { | |
console.log(`\n${msg}\n${'-'.repeat(msg.length)}\n`) | |
runAll([ | |
cb => log('local', local.feed, local.kappa, cb), | |
cb => log('remote', remote.feed, remote.kappa, cb), | |
() => cb() | |
]) | |
} | |
}) | |
function init (key) { | |
const feed = hypercore(ram, key, { | |
valueEncoding: 'json', | |
sparse: true | |
}) | |
const kappa = new Kappa() | |
kappa.use('topics', createTopicView()) | |
kappa.source('sparsefeed', createIndexer, { | |
feed, | |
onquery (topic, cb) { | |
kappa.api.topics.query(topic, cb) | |
} | |
}) | |
return { feed, kappa } | |
} | |
function createTopicView () { | |
const topics = {} | |
const view = { | |
map (msgs, next) { | |
for (const msg of msgs) { | |
if (msg.value && msg.value.topic) { | |
const topic = msg.value.topic | |
topics[topic] = topics[topic] || [] | |
topics[topic].push({ seq: msg.seq, key: msg.key }) | |
} | |
} | |
next() | |
}, | |
api: { | |
query (kappa, topic, cb) { | |
this.ready(() => { | |
if (!topics[topic]) return cb() | |
const feeds = topics[topic].reduce((agg, msg) => { | |
agg[msg.key] = agg[msg.key] || [] | |
agg[msg.key].push(msg.seq) | |
return agg | |
}, {}) | |
cb(null, feeds) | |
}) | |
}, | |
all (kappa, cb) { | |
this.ready(() => cb(null, topics)) | |
}, | |
remoteQuery (kappa, topic) { | |
const flows = kappa.flowsByView(this.name) | |
for (let flow of flows) { | |
if (flow.source.query) { | |
flow.source.query(topic) | |
} | |
} | |
} | |
} | |
} | |
return view | |
} | |
function log (name, feed, kappa, cb) { | |
kappa.api.topics.all((err, res) => { | |
console.log(` ${name} | |
Key: ${shortkey(feed)} | |
Writable: ${feed.writable} | |
Length: ${feed.length} | |
Downloaded: ${feed.downloaded()} | |
Topics: ${logTopics(res)}`) | |
cb() | |
}) | |
function logTopics (topics) { | |
let str = '' | |
for (let [key, value] of Object.entries(topics)) { | |
str += `${key}: ${value.length} ` | |
} | |
return str | |
} | |
} | |
function shortkey (feed) { | |
feed._label = feed.key.toString('hex').substring(0, 6) + '..' | |
return feed._label | |
} | |
function pad (str, i) { | |
if (str.length < i) str = str + ' '.repeat(i - str.length) | |
return str | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment