Last active
November 24, 2016 06:28
-
-
Save haadcode/52948777e9f2f69cd01bdcd9634ee324 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
const PubsubMessageStream = require('ipfs-pubsub-message-stream') | |
class Pubsub { | |
constructor (send: Function) { | |
this._subscriptions = {} | |
this._send = send | |
} | |
subscribe (topic: string, options: Object = {}) { | |
// We don't allow multiple subscriptions at the moment | |
if (this._subscriptions[topic]) | |
throw new Error(`Already subscribed to '${topic}'`) | |
// Command | |
const command = { | |
path: 'pubsub/sub', | |
args: [topic], | |
options: Object.assign(options, { discover: false }) | |
} | |
// Start the request | |
const {res, req} = await this._send(command) | |
// Convert the response from HTTP to Pubsub messages | |
let stream = res.pipe(new PubsubMessageStream()) | |
// Add the request to the active subscriptions | |
return this._addSubscription(topic, req, stream) | |
} | |
publish (topic: string, data: Buffer) { | |
return await this._send({ | |
path: 'pubsub/pub', | |
args: [topic, buf] | |
}) | |
} | |
ls () { | |
const res = await this._send({path: 'pubsub/ls'}) | |
return res.Strings || [] | |
} | |
peers (topic: string) { | |
if (!this._subscriptions[topic]) | |
throw new Error(`Not subscribed to '${topic}'`) | |
const res = await this._send({ | |
path: 'pubsub/peers', | |
args: [topic] | |
}) | |
return res.Strings || [] | |
} | |
_addSubscription (topic: string, connection: IncomingMessage, output: PubsubMessageStream) { | |
// Add a cancel method so that the subscription can be cleanly cancelled | |
output.cancel = () => this._removeSubscription(topic) | |
this._subscriptions[topic] = { connection: connection, output: output } | |
return output | |
} | |
_removeSubscription (topic: string) { | |
if (!this._subscriptions[topic]) | |
throw new Error(`Not subscribed to ${topic}`) | |
subscriptions[topic].connection.abort() | |
subscriptions[topic].output.end() | |
delete subscriptions[topic] | |
}) | |
} | |
module.exports = Pubsub |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment