Last active
October 15, 2019 10:43
-
-
Save vasco-santos/2c39ab16d15a87c573bef4f916472831 to your computer and use it in GitHub Desktop.
Pubsub base protocol proposal for libp2p refactor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
const assert = require('assert') | |
const debug = require('debug') | |
const EventEmitter = require('events') | |
const errcode = require('err-code') | |
const PeerInfo = require('peer-info') | |
const message = require('./message') | |
const Peer = require('./peer') | |
const utils = require('./utils') | |
/** | |
* PubsubBaseProtocol handles the peers and connections logic for pubsub routers | |
*/ | |
class PubsubBaseProtocol extends EventEmitter { | |
/** | |
* @param {String} debugName | |
* @param {String} multicodec | |
* @param {PeerInfo} peerInfo | |
* @param {Object} registration | |
* @param {function} registration.register | |
* @param {function} registration.unregister | |
* @param {Object} [options] | |
* @param {boolean} [options.signMessages] if messages should be signed, defaults to true | |
* @param {boolean} [options.strictSigning] if message signing should be required, defaults to true | |
* @constructor | |
*/ | |
constructor (debugName, multicodec, peerInfo, registration, options = {}) { | |
assert(debugName && typeof debugName === 'string', 'a debugname `string` is required') | |
assert(multicodec && typeof multicodec === 'string', 'a multicodec `string` is required') | |
assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`') | |
// Registration handling | |
assert(registration && typeof registration.register === 'function', 'a register function must be provided in registration') | |
assert(registration && typeof registration.unregister === 'function', 'a unregister function must be provided in registration') | |
super() | |
options = { | |
signMessages: true, | |
strictSigning: true, | |
...options | |
} | |
this.log = debug(debugName) | |
this.log.err = debug(`${debugName}:error`) | |
this.multicodec = multicodec | |
this.peerInfo = peerInfo | |
this.registration = registration | |
this.started = false | |
if (options.signMessages) { | |
this.peerId = this.peerInfo.id | |
} | |
/** | |
* Map of topics to which peers are subscribed to | |
* | |
* @type {Map<string, Peer>} | |
*/ | |
this.topics = new Map() | |
/** | |
* Map of peers. | |
* | |
* @type {Map<string, Peer>} | |
*/ | |
this.peers = new Map() | |
} | |
/** | |
* Mounts the pubsub protocol onto the libp2p node and sends our | |
* subscriptions to every peer conneceted | |
* @returns {Promise} | |
*/ | |
async start () { | |
if (this.started) { | |
return | |
} | |
this.log('starting') | |
// register protocol with multicodec and handlers | |
await this.registration.register(this.multicodec, { | |
onConnect: this._onPeerPeerConnected, | |
onDisconnect: this._onPeerDisconnected | |
}) | |
this.log('started') | |
this.started = true | |
} | |
/** | |
* Unmounts the pubsub protocol and shuts down every connection | |
* @returns {void} | |
*/ | |
stop () { | |
if (!this.started) { | |
throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET') | |
} | |
// unregister protocol and handlers | |
await this.registration.unregister(this.multicodec) | |
this.log('stopping') | |
this.peers.forEach((peer) => peer.close()) | |
this.peers = new Map() | |
this.started = false | |
this.log('stopped') | |
} | |
/** | |
* Peer connected successfully with pubsub protocol. | |
* @private | |
* @param {PeerInfo} peerInfo peer info | |
* @param {Connection} conn connection to the peer | |
*/ | |
_onPeerConnected (peerInfo, conn) { | |
const idB58Str = peerInfo.id.toB58String() | |
this.log('connected', idB58Str) | |
const peer = this._addPeer(new Peer(peerInfo)) | |
peer.attachConnection(conn) | |
this._processMessages(idB58Str, conn, peer) | |
} | |
/** | |
* Peer disconnected. | |
* @private | |
* @param {PeerInfo} peerInfo peer info | |
* @param {Error} err error for connection end | |
*/ | |
_onPeerDisconnected (peerInfo, err) { | |
const idB58Str = peerInfo.id.toB58String() | |
const peer = this.peers.get(idB58Str) | |
this.log('connection ended', idB58Str, err ? err.message : '') | |
this._removePeer(peer) | |
} | |
/** | |
* Add a new connected peer to the peers map. | |
* @private | |
* @param {PeerInfo} peer peer info | |
* @returns {PeerInfo} | |
*/ | |
_addPeer (peer) { | |
const id = peer.info.id.toB58String() | |
/* | |
Always use an existing peer. | |
What is happening here is: "If the other peer has already dialed to me, we already have | |
an establish link between the two, what might be missing is a | |
Connection specifically between me and that Peer" | |
*/ | |
let existing = this.peers.get(id) | |
if (!existing) { | |
this.log('new peer', id) | |
this.peers.set(id, peer) | |
existing = peer | |
peer.once('close', () => this._removePeer(peer)) | |
} | |
++existing._references | |
return existing | |
} | |
/** | |
* Remove a peer from the peers map if it has no references. | |
* @private | |
* @param {Peer} peer peer state | |
* @returns {PeerInfo} | |
*/ | |
_removePeer (peer) { | |
const id = peer.info.id.toB58String() | |
this.log('remove', id, peer._references) | |
// Only delete when no one else is referencing this peer. | |
if (--peer._references === 0) { | |
this.log('delete peer', id) | |
this.peers.delete(id) | |
} | |
return peer | |
} | |
/** | |
* Get a list of the peer-ids that are subscribed to one topic. | |
* @param {string} topic | |
* @returns {Array<string>} | |
*/ | |
getPeersSubscribed (topic) { | |
if (!topic || typeof topic !== 'string') { | |
throw errcode(new Error('a string topic must be provided'), 'ERR_NOT_VALID_TOPIC') | |
} | |
return Array.from(peer.values()) | |
.filter((peer) => peer.topics.has(topic)) | |
.map((peer) => peer.info.id.toB58String()) | |
} | |
/** | |
* Validates the given message. The signature will be checked for authenticity. | |
* @param {rpc.RPC.Message} message | |
* @returns {Promise<Boolean>} | |
*/ | |
async validate (message) { // eslint-disable-line require-await | |
// If strict signing is on and we have no signature, abort | |
if (this.strictSigning && !message.signature) { | |
this.log('Signing required and no signature was present, dropping message:', message) | |
return Promise.resolve(false) | |
} | |
// Check the message signature if present | |
if (message.signature) { | |
return verifySignature(message) | |
} else { | |
return Promise.resolve(true) | |
} | |
} | |
/** | |
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation. | |
* For example, a Floodsub implementation might simply publish each message to each topic for every peer | |
* @abstract | |
* @param {Array<string>|string} topics | |
* @param {Array<any>|any} messages | |
* @returns {undefined} | |
* | |
*/ | |
publish (topics, messages) { | |
throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') | |
} | |
/** | |
* Overriding the implementation of subscribe should handle the appropriate algorithms for the publish/subscriber implementation. | |
* For example, a Floodsub implementation might simply send a message for every peer showing interest in the topics | |
* @abstract | |
* @param {Array<string>|string} topics | |
* @returns {undefined} | |
*/ | |
subscribe (topics) { | |
throw errcode('subscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') | |
// TODO: add to docs that implementations must call addSubscription() | |
} | |
/** | |
* Overriding the implementation of unsubscribe should handle the appropriate algorithms for the publish/subscriber implementation. | |
* For example, a Floodsub implementation might simply send a message for every peer revoking interest in the topics | |
* @abstract | |
* @param {Array<string>|string} topics | |
* @returns {undefined} | |
*/ | |
unsubscribe (topics) { | |
throw errcode('unsubscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') | |
} | |
/** | |
* Overriding the implementation of getTopics should handle the appropriate algorithms for the publish/subscriber implementation. | |
* Get the list of subscriptions the peer is subscribed to. | |
* @abstract | |
* @returns {Array<string>} | |
*/ | |
getTopics () { | |
throw errcode('getTopics must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') | |
} | |
/** | |
* Overriding the implementation of _processMessages should keep the connection and is | |
* responsible for processing each RPC message received by other peers. | |
* @abstract | |
* @param {string} idB58Str peer id string in base58 | |
* @param {Connection} conn connection | |
* @param {PeerInfo} peer peer info | |
* @returns {undefined} | |
* | |
*/ | |
_processMessages (idB58Str, conn, peer) { | |
throw errcode('_processMessages must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') | |
} | |
} | |
module.exports = PubsubBaseProtocol | |
module.exports.message = message | |
module.exports.utils = utils |
Cool, I will consider this to the PR!
Hey 👋
Regarding your comment with the registrar, my previous idea was:
// register protocol with multicodec and handlers
await this.registration.register(this.multicodec, {
onConnect: this._onPeerPeerConnected,
onDisconnect: this._onPeerDisconnected
})
// unregister before stop
await this.registration.unregister(this.multicodec)
With your comment:
registrar.request(protocol, { min: 10, max: 20 })
I am iterating as:
const handlers = {
onConnect: this._onPeerPeerConnected,
onDisconnect: this._onPeerDisconnected
}
const topology = { min: 10, max: 20 }
// Subsystem start
await registrar.bind(this.multicodec, handlers, topology)
// Subsystem stop
await registrar.unbind(this.multicodec)
I didn't find request a good naming because we need to have an opposite action, and we do a "request" we usually provide an action associated (example HTTP REQ)
Other solution for the handlers, was the registrar to be an EventEmitter, I am not sure what I prefer though
I would go with .register
and .unregister
. The main thought behind request
was that a system was asking for a certain amount of resources, but they're not necessarily going to get them if they ask for an invalid amount, but I think we can potentially throw an error on register
about asking for an invalid Topology.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
As we go through this I'd like to think about registration in terms of Topology requests, libp2p/notes#13. I think
registration
could becomeregistrar
and that would end up being theTopologyRegistrar
the Connection Manager and libp2p uses. It could be very basic for now, but we could eventually evolve that to allow for more complex Topology registration/interactions.Although it may be better to pass a
Topology
object to the request, and we could create a basic Topology to start with that just tries to maintain a range of connections on a given protocol.