Last active
October 15, 2019 10:43
-
-
Save vasco-santos/2c39ab16d15a87c573bef4f916472831 to your computer and use it in GitHub Desktop.
Pubsub base protocol proposal for libp2p refactor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
const assert = require('assert') | |
const debug = require('debug') | |
const EventEmitter = require('events') | |
const errcode = require('err-code') | |
const PeerInfo = require('peer-info') | |
const message = require('./message') | |
const Peer = require('./peer') | |
const utils = require('./utils') | |
/** | |
* PubsubBaseProtocol handles the peers and connections logic for pubsub routers | |
*/ | |
class PubsubBaseProtocol extends EventEmitter { | |
/** | |
* @param {String} debugName | |
* @param {String} multicodec | |
* @param {PeerInfo} peerInfo | |
* @param {Object} registration | |
* @param {function} registration.register | |
* @param {function} registration.unregister | |
* @param {Object} [options] | |
* @param {boolean} [options.signMessages] if messages should be signed, defaults to true | |
* @param {boolean} [options.strictSigning] if message signing should be required, defaults to true | |
* @constructor | |
*/ | |
constructor (debugName, multicodec, peerInfo, registration, options = {}) { | |
assert(debugName && typeof debugName === 'string', 'a debugname `string` is required') | |
assert(multicodec && typeof multicodec === 'string', 'a multicodec `string` is required') | |
assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`') | |
// Registration handling | |
assert(registration && typeof registration.register === 'function', 'a register function must be provided in registration') | |
assert(registration && typeof registration.unregister === 'function', 'a unregister function must be provided in registration') | |
super() | |
options = { | |
signMessages: true, | |
strictSigning: true, | |
...options | |
} | |
this.log = debug(debugName) | |
this.log.err = debug(`${debugName}:error`) | |
this.multicodec = multicodec | |
this.peerInfo = peerInfo | |
this.registration = registration | |
this.started = false | |
if (options.signMessages) { | |
this.peerId = this.peerInfo.id | |
} | |
/** | |
* Map of topics to which peers are subscribed to | |
* | |
* @type {Map<string, Peer>} | |
*/ | |
this.topics = new Map() | |
/** | |
* Map of peers. | |
* | |
* @type {Map<string, Peer>} | |
*/ | |
this.peers = new Map() | |
} | |
/** | |
* Mounts the pubsub protocol onto the libp2p node and sends our | |
* subscriptions to every peer conneceted | |
* @returns {Promise} | |
*/ | |
async start () { | |
if (this.started) { | |
return | |
} | |
this.log('starting') | |
// register protocol with multicodec and handlers | |
await this.registration.register(this.multicodec, { | |
onConnect: this._onPeerPeerConnected, | |
onDisconnect: this._onPeerDisconnected | |
}) | |
this.log('started') | |
this.started = true | |
} | |
/** | |
* Unmounts the pubsub protocol and shuts down every connection | |
* @returns {void} | |
*/ | |
stop () { | |
if (!this.started) { | |
throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET') | |
} | |
// unregister protocol and handlers | |
await this.registration.unregister(this.multicodec) | |
this.log('stopping') | |
this.peers.forEach((peer) => peer.close()) | |
this.peers = new Map() | |
this.started = false | |
this.log('stopped') | |
} | |
/** | |
* Peer connected successfully with pubsub protocol. | |
* @private | |
* @param {PeerInfo} peerInfo peer info | |
* @param {Connection} conn connection to the peer | |
*/ | |
_onPeerConnected (peerInfo, conn) { | |
const idB58Str = peerInfo.id.toB58String() | |
this.log('connected', idB58Str) | |
const peer = this._addPeer(new Peer(peerInfo)) | |
peer.attachConnection(conn) | |
this._processMessages(idB58Str, conn, peer) | |
} | |
/** | |
* Peer disconnected. | |
* @private | |
* @param {PeerInfo} peerInfo peer info | |
* @param {Error} err error for connection end | |
*/ | |
_onPeerDisconnected (peerInfo, err) { | |
const idB58Str = peerInfo.id.toB58String() | |
const peer = this.peers.get(idB58Str) | |
this.log('connection ended', idB58Str, err ? err.message : '') | |
this._removePeer(peer) | |
} | |
/** | |
* Add a new connected peer to the peers map. | |
* @private | |
* @param {PeerInfo} peer peer info | |
* @returns {PeerInfo} | |
*/ | |
_addPeer (peer) { | |
const id = peer.info.id.toB58String() | |
/* | |
Always use an existing peer. | |
What is happening here is: "If the other peer has already dialed to me, we already have | |
an establish link between the two, what might be missing is a | |
Connection specifically between me and that Peer" | |
*/ | |
let existing = this.peers.get(id) | |
if (!existing) { | |
this.log('new peer', id) | |
this.peers.set(id, peer) | |
existing = peer | |
peer.once('close', () => this._removePeer(peer)) | |
} | |
++existing._references | |
return existing | |
} | |
/** | |
* Remove a peer from the peers map if it has no references. | |
* @private | |
* @param {Peer} peer peer state | |
* @returns {PeerInfo} | |
*/ | |
_removePeer (peer) { | |
const id = peer.info.id.toB58String() | |
this.log('remove', id, peer._references) | |
// Only delete when no one else is referencing this peer. | |
if (--peer._references === 0) { | |
this.log('delete peer', id) | |
this.peers.delete(id) | |
} | |
return peer | |
} | |
/** | |
* Get a list of the peer-ids that are subscribed to one topic. | |
* @param {string} topic | |
* @returns {Array<string>} | |
*/ | |
getPeersSubscribed (topic) { | |
if (!topic || typeof topic !== 'string') { | |
throw errcode(new Error('a string topic must be provided'), 'ERR_NOT_VALID_TOPIC') | |
} | |
return Array.from(peer.values()) | |
.filter((peer) => peer.topics.has(topic)) | |
.map((peer) => peer.info.id.toB58String()) | |
} | |
/** | |
* Validates the given message. The signature will be checked for authenticity. | |
* @param {rpc.RPC.Message} message | |
* @returns {Promise<Boolean>} | |
*/ | |
async validate (message) { // eslint-disable-line require-await | |
// If strict signing is on and we have no signature, abort | |
if (this.strictSigning && !message.signature) { | |
this.log('Signing required and no signature was present, dropping message:', message) | |
return Promise.resolve(false) | |
} | |
// Check the message signature if present | |
if (message.signature) { | |
return verifySignature(message) | |
} else { | |
return Promise.resolve(true) | |
} | |
} | |
/** | |
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation. | |
* For example, a Floodsub implementation might simply publish each message to each topic for every peer | |
* @abstract | |
* @param {Array<string>|string} topics | |
* @param {Array<any>|any} messages | |
* @returns {undefined} | |
* | |
*/ | |
publish (topics, messages) { | |
throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') | |
} | |
/** | |
* Overriding the implementation of subscribe should handle the appropriate algorithms for the publish/subscriber implementation. | |
* For example, a Floodsub implementation might simply send a message for every peer showing interest in the topics | |
* @abstract | |
* @param {Array<string>|string} topics | |
* @returns {undefined} | |
*/ | |
subscribe (topics) { | |
throw errcode('subscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') | |
// TODO: add to docs that implementations must call addSubscription() | |
} | |
/** | |
* Overriding the implementation of unsubscribe should handle the appropriate algorithms for the publish/subscriber implementation. | |
* For example, a Floodsub implementation might simply send a message for every peer revoking interest in the topics | |
* @abstract | |
* @param {Array<string>|string} topics | |
* @returns {undefined} | |
*/ | |
unsubscribe (topics) { | |
throw errcode('unsubscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') | |
} | |
/** | |
* Overriding the implementation of getTopics should handle the appropriate algorithms for the publish/subscriber implementation. | |
* Get the list of subscriptions the peer is subscribed to. | |
* @abstract | |
* @returns {Array<string>} | |
*/ | |
getTopics () { | |
throw errcode('getTopics must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') | |
} | |
/** | |
* Overriding the implementation of _processMessages should keep the connection and is | |
* responsible for processing each RPC message received by other peers. | |
* @abstract | |
* @param {string} idB58Str peer id string in base58 | |
* @param {Connection} conn connection | |
* @param {PeerInfo} peer peer info | |
* @returns {undefined} | |
* | |
*/ | |
_processMessages (idB58Str, conn, peer) { | |
throw errcode('_processMessages must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') | |
} | |
} | |
module.exports = PubsubBaseProtocol | |
module.exports.message = message | |
module.exports.utils = utils |
I would go with .register
and .unregister
. The main thought behind request
was that a system was asking for a certain amount of resources, but they're not necessarily going to get them if they ask for an invalid amount, but I think we can potentially throw an error on register
about asking for an invalid Topology.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey 👋
Regarding your comment with the registrar, my previous idea was:
With your comment:
I am iterating as:
I didn't find request a good naming because we need to have an opposite action, and we do a "request" we usually provide an action associated (example HTTP REQ)
Other solution for the handlers, was the registrar to be an EventEmitter, I am not sure what I prefer though