Skip to content

Instantly share code, notes, and snippets.

@songlairui
Last active November 3, 2018 15:53
Show Gist options
  • Save songlairui/143017c388e175fcb7cc4096ba3a5b90 to your computer and use it in GitHub Desktop.
Save songlairui/143017c388e175fcb7cc4096ba3a5b90 to your computer and use it in GitHub Desktop.
hapijs/nes 提供了浏览器端的基于浏览器 websocket(hybi-13) 的api , 在小程序中使用client,需要先模拟这个WebSocket
import WebSocket from './websocket'
export default factory()
// below copy from https://github.com/hapijs/nes/blob/master/lib/client.js
function factory() {
// Utilities
const version = '2'
const ignore = function() {}
const stringify = function(message) {
try {
return JSON.stringify(message)
} catch (err) {
throw new NesError(err, errorTypes.USER)
}
}
const nextTick = function(callback) {
return err => {
setTimeout(() => callback(err), 0)
}
}
// NesError types
const errorTypes = {
TIMEOUT: 'timeout',
DISCONNECT: 'disconnect',
SERVER: 'server',
PROTOCOL: 'protocol',
WS: 'ws',
USER: 'user'
}
const NesError = function(err, type) {
if (typeof err === 'string') {
err = new Error(err)
}
err.type = type
err.isNes = true
try {
throw err // ensure stack trace for IE11
} catch (withStack) {
return withStack
}
}
// Error codes
const errorCodes = {
1000: 'Normal closure',
1001: 'Going away',
1002: 'Protocol error',
1003: 'Unsupported data',
1004: 'Reserved',
1005: 'No status received',
1006: 'Abnormal closure',
1007: 'Invalid frame payload data',
1008: 'Policy violation',
1009: 'Message too big',
1010: 'Mandatory extension',
1011: 'Internal server error',
1015: 'TLS handshake'
}
// Client
const Client = function(url, options) {
options = options || {}
// Configuration
this._url = url
this._settings = options
this._heartbeatTimeout = false // Server heartbeat configuration
// State
this._ws = null
this._reconnection = null
this._reconnectionTimer = null
this._ids = 0 // Id counter
this._requests = {} // id -> { resolve, reject, timeout }
this._subscriptions = {} // path -> [callbacks]
this._heartbeat = null
this._packets = []
this._disconnectListeners = null
this._disconnectRequested = false
// Events
this.onError = err => console.error(err) // General error handler (only when an error cannot be associated with a request)
this.onConnect = ignore // Called whenever a connection is established
this.onDisconnect = ignore // Called whenever a connection is lost: function(willReconnect)
this.onHeartbeatTimeout = ignore // Called when a heartbeat timeout will cause a disconnection
this.onUpdate = ignore
// Public properties
this.id = null // Assigned when hello response is received
}
Client.WebSocket = WebSocket
Client.prototype.connect = function(options) {
options = options || {}
if (this._reconnection) {
return Promise.reject(
new NesError(
'Cannot connect while client attempts to reconnect',
errorTypes.USER
)
)
}
if (this._ws) {
return Promise.reject(new NesError('Already connected', errorTypes.USER))
}
if (options.reconnect !== false) {
// Defaults to true
this._reconnection = {
// Options: reconnect, delay, maxDelay
wait: 0,
delay: options.delay || 1000, // 1 second
maxDelay: options.maxDelay || 5000, // 5 seconds
retries: options.retries || Infinity, // Unlimited
settings: {
auth: options.auth,
timeout: options.timeout
}
}
} else {
this._reconnection = null
}
return new Promise((resolve, reject) => {
this._connect(options, true, err => {
if (err) {
return reject(err)
}
return resolve()
})
})
}
Client.prototype._connect = function(options, initial, next) {
const ws = new Client.WebSocket(this._url, this._settings.ws) // Settings used by node.js only
this._ws = ws
clearTimeout(this._reconnectionTimer)
this._reconnectionTimer = null
const reconnect = event => {
if (ws.onopen) {
finalize(
new NesError(
'Connection terminated while waiting to connect',
errorTypes.WS
)
)
}
const wasRequested = this._disconnectRequested // Get value before _cleanup()
this._cleanup()
const log = {
code: event.code,
explanation: errorCodes[event.code] || 'Unknown',
reason: event.reason,
wasClean: event.wasClean,
willReconnect: this._willReconnect(),
wasRequested
}
this.onDisconnect(log.willReconnect, log)
this._reconnect()
}
const finalize = err => {
if (next) {
// Call only once when connect() is called
const nextHolder = next
next = null
return nextHolder(err)
}
return this.onError(err)
}
const timeoutHandler = () => {
this._cleanup()
finalize(new NesError('Connection timed out', errorTypes.TIMEOUT))
if (initial) {
return this._reconnect()
}
}
const timeout = options.timeout
? setTimeout(timeoutHandler, options.timeout)
: null
ws.onopen = () => {
clearTimeout(timeout)
ws.onopen = null
this._hello(options.auth)
.then(() => {
this.onConnect()
finalize()
})
.catch(err => {
if (err.path) {
delete this._subscriptions[err.path]
}
this._disconnect(() => nextTick(finalize)(err), true) // Stop reconnection when the hello message returns error
})
}
ws.onerror = event => {
clearTimeout(timeout)
if (this._willReconnect()) {
return reconnect(event)
}
this._cleanup()
const error = new NesError('Socket error', errorTypes.WS)
return finalize(error)
}
ws.onclose = reconnect
ws.onmessage = message => {
return this._onMessage(message)
}
}
Client.prototype.overrideReconnectionAuth = function(auth) {
if (!this._reconnection) {
return false
}
this._reconnection.settings.auth = auth
return true
}
Client.prototype.disconnect = function() {
return new Promise(resolve => this._disconnect(resolve, false))
}
Client.prototype._disconnect = function(next, isInternal) {
this._reconnection = null
clearTimeout(this._reconnectionTimer)
this._reconnectionTimer = null
const requested = this._disconnectRequested || !isInternal // Retain true
if (this._disconnectListeners) {
this._disconnectRequested = requested
this._disconnectListeners.push(next)
return
}
if (
!this._ws ||
(this._ws.readyState !== Client.WebSocket.OPEN &&
this._ws.readyState !== Client.WebSocket.CONNECTING)
) {
return next()
}
this._disconnectRequested = requested
this._disconnectListeners = [next]
this._ws.close()
}
Client.prototype._cleanup = function() {
if (this._ws) {
const ws = this._ws
this._ws = null
if (
ws.readyState === Client.WebSocket.OPEN ||
ws.readyState === Client.WebSocket.CONNECTING
) {
ws.close()
}
ws.onopen = null
ws.onclose = null
ws.onerror = ignore
ws.onmessage = null
}
this._packets = []
this.id = null
clearTimeout(this._heartbeat)
this._heartbeat = null
// Flush pending requests
const error = new NesError(
'Request failed - server disconnected',
errorTypes.DISCONNECT
)
const requests = this._requests
this._requests = {}
const ids = Object.keys(requests)
for (let i = 0; i < ids.length; ++i) {
const id = ids[i]
const request = requests[id]
clearTimeout(request.timeout)
request.reject(error)
}
if (this._disconnectListeners) {
const listeners = this._disconnectListeners
this._disconnectListeners = null
this._disconnectRequested = false
listeners.forEach(listener => listener())
}
}
Client.prototype._reconnect = function() {
// Reconnect
const reconnection = this._reconnection
if (!reconnection) {
return
}
if (reconnection.retries < 1) {
return this._disconnect(ignore, true) // Clear _reconnection state
}
--reconnection.retries
reconnection.wait = reconnection.wait + reconnection.delay
const timeout = Math.min(reconnection.wait, reconnection.maxDelay)
this._reconnectionTimer = setTimeout(() => {
this._connect(reconnection.settings, false, err => {
if (err) {
this.onError(err)
return this._reconnect()
}
})
}, timeout)
}
Client.prototype.request = function(options) {
if (typeof options === 'string') {
options = {
method: 'GET',
path: options
}
}
const request = {
type: 'request',
method: options.method || 'GET',
path: options.path,
headers: options.headers,
payload: options.payload
}
return this._send(request, true)
}
Client.prototype.message = function(message) {
const request = {
type: 'message',
message
}
return this._send(request, true)
}
Client.prototype._send = function(request, track) {
if (!this._ws || this._ws.readyState !== Client.WebSocket.OPEN) {
return Promise.reject(
new NesError(
'Failed to send message - server disconnected',
errorTypes.DISCONNECT
)
)
}
request.id = ++this._ids
try {
var encoded = stringify(request)
} catch (err) {
return Promise.reject(err)
}
// Ignore errors
if (!track) {
try {
this._ws.send(encoded)
return Promise.resolve()
} catch (err) {
return Promise.reject(new NesError(err, errorTypes.WS))
}
}
// Track errors
const record = {
resolve: null,
reject: null,
timeout: null
}
const promise = new Promise((resolve, reject) => {
record.resolve = resolve
record.reject = reject
})
if (this._settings.timeout) {
record.timeout = setTimeout(() => {
record.timeout = null
return record.reject(
new NesError('Request timed out', errorTypes.TIMEOUT)
)
}, this._settings.timeout)
}
this._requests[request.id] = record
try {
this._ws.send(encoded)
} catch (err) {
clearTimeout(this._requests[request.id].timeout)
delete this._requests[request.id]
return Promise.reject(new NesError(err, errorTypes.WS))
}
return promise
}
Client.prototype._hello = function(auth) {
const request = {
type: 'hello',
version
}
if (auth) {
request.auth = auth
}
const subs = this.subscriptions()
if (subs.length) {
request.subs = subs
}
return this._send(request, true)
}
Client.prototype.subscriptions = function() {
return Object.keys(this._subscriptions)
}
Client.prototype.subscribe = function(path, handler) {
if (!path || path[0] !== '/') {
return Promise.reject(new NesError('Invalid path', errorTypes.USER))
}
const subs = this._subscriptions[path]
if (subs) {
// Already subscribed
if (subs.indexOf(handler) === -1) {
subs.push(handler)
}
return Promise.resolve()
}
this._subscriptions[path] = [handler]
if (!this._ws || this._ws.readyState !== Client.WebSocket.OPEN) {
// Queued subscription
return Promise.resolve()
}
const request = {
type: 'sub',
path
}
const promise = this._send(request, true)
promise.catch(() => {
delete this._subscriptions[path]
})
return promise
}
Client.prototype.unsubscribe = function(path, handler) {
if (!path || path[0] !== '/') {
return Promise.reject(new NesError('Invalid path', errorTypes.USER))
}
const subs = this._subscriptions[path]
if (!subs) {
return Promise.resolve()
}
let sync = false
if (!handler) {
delete this._subscriptions[path]
sync = true
} else {
const pos = subs.indexOf(handler)
if (pos === -1) {
return Promise.resolve()
}
subs.splice(pos, 1)
if (!subs.length) {
delete this._subscriptions[path]
sync = true
}
}
if (!sync || !this._ws || this._ws.readyState !== Client.WebSocket.OPEN) {
return Promise.resolve()
}
const request = {
type: 'unsub',
path
}
const promise = this._send(request, true)
promise.catch(ignore) // Ignoring errors as the subscription handlers are already removed
return promise
}
Client.prototype._onMessage = function(message) {
this._beat()
let data = message.data
const prefix = data[0]
if (prefix !== '{') {
this._packets.push(data.slice(1))
if (prefix !== '!') {
return
}
data = this._packets.join('')
this._packets = []
}
if (this._packets.length) {
this._packets = []
this.onError(
new NesError('Received an incomplete message', errorTypes.PROTOCOL)
)
}
try {
var update = JSON.parse(data)
} catch (err) {
return this.onError(new NesError(err, errorTypes.PROTOCOL))
}
// Recreate error
let error = null
if (
update.statusCode &&
update.statusCode >= 400 &&
update.statusCode <= 599
) {
error = new NesError(
update.payload.message || update.payload.error || 'Error',
errorTypes.SERVER
)
error.statusCode = update.statusCode
error.data = update.payload
error.headers = update.headers
error.path = update.path
}
// Ping
if (update.type === 'ping') {
return this._send({ type: 'ping' }, false).catch(ignore) // Ignore errors
}
// Broadcast and update
if (update.type === 'update') {
return this.onUpdate(update.message)
}
// Publish or Revoke
if (update.type === 'pub' || update.type === 'revoke') {
const handlers = this._subscriptions[update.path]
if (update.type === 'revoke') {
delete this._subscriptions[update.path]
}
if (handlers && update.message !== undefined) {
const flags = {}
if (update.type === 'revoke') {
flags.revoked = true
}
for (let i = 0; i < handlers.length; ++i) {
handlers[i](update.message, flags)
}
}
return
}
// Lookup request (message must include an id from this point)
const request = this._requests[update.id]
if (!request) {
return this.onError(
new NesError(
'Received response for unknown request',
errorTypes.PROTOCOL
)
)
}
clearTimeout(request.timeout)
delete this._requests[update.id]
const next = (err, args) => {
if (err) {
return request.reject(err)
}
return request.resolve(args)
}
// Response
if (update.type === 'request') {
return next(error, {
payload: update.payload,
statusCode: update.statusCode,
headers: update.headers
})
}
// Custom message
if (update.type === 'message') {
return next(error, { payload: update.message })
}
// Authentication
if (update.type === 'hello') {
this.id = update.socket
if (update.heartbeat) {
this._heartbeatTimeout =
update.heartbeat.interval + update.heartbeat.timeout
this._beat() // Call again once timeout is set
}
return next(error)
}
// Subscriptions
if (update.type === 'sub' || update.type === 'unsub') {
return next(error)
}
next(new NesError('Received invalid response', errorTypes.PROTOCOL))
return this.onError(
new NesError(
'Received unknown response type: ' + update.type,
errorTypes.PROTOCOL
)
)
}
Client.prototype._beat = function() {
if (!this._heartbeatTimeout) {
return
}
clearTimeout(this._heartbeat)
this._heartbeat = setTimeout(() => {
this.onError(
new NesError(
'Disconnecting due to heartbeat timeout',
errorTypes.TIMEOUT
)
)
this.onHeartbeatTimeout(this._willReconnect())
this._ws.close()
}, this._heartbeatTimeout)
}
Client.prototype._willReconnect = function() {
return !!(this._reconnection && this._reconnection.retries >= 1)
}
// Expose interface
return { Client }
}
/**
* 小程序中模仿 web 中的 WebSocket
* imitate web WebSocket in wxapp
* @param {*} url
*/
function WebSocket(url) {
this.url = url
this.msg = ''
;['Open', 'Close', 'Error', 'Message'].forEach(eName => {
this[`_on${eName}`] = null
})
this.task = wx.connectSocket({
url,
success: msg => {
this.msg = msg
},
fail: err => {
this.msg = err
}
})
}
;['Open', 'Close', 'Error', 'Message'].forEach(eName => {
Object.defineProperty(WebSocket.prototype, `on${eName.toLowerCase()}`, {
enumerable: true,
configurable: true,
get() {
return this[`_on${eName}`]
},
set(val) {
this[`_on${eName}`] = val
this.task[`on${eName}`](this[`_on${eName}`])
}
})
})
;['send', 'close'].forEach(fnName => {
WebSocket.prototype[fnName] = function(data) {
this.task[fnName]({
data
})
}
})
@songlairui
Copy link
Author

在小程序中使用 Nes.Client

使用hapi搭建后端api时,想用websocket。官方已推荐使用nes,也提供了browser版的command module。但是在小程序中没有现成的js可用。

1. 怎么办?做个补丁。

补丁思路:将小程序中形如 wx.connectSocket 的api 变形为 new WebSocket() 的形式
上文已简易完成:weapp-websocket.js
// 提供了new WebSocket(url), onmessage... 等基本api。 自定义头,protocol 等参数,可同理适配。

2. 遇到第一个坑,小程序中这样一段代码,无法产生预期效果

Object.assign(Function('return this')(),{ok:'ok'})
console.info(typeof ok) // undefined

其中,第一行 Function 在小程序中,是被篡改过的,无法获取到根变量
第二行自然无法按照预期的完成。
这样就无法在代码中创建一个全局可直接通过字面访问的变量 WebSocket 了。
必须更改 nes/client 的代码了

3. 修改 nes/lib/client.js 代码

复制一份代码出来,做更改如 nes-client-mp.js:

  • 头部引入 WebSocket, 令 WebSocket 作用域内可访问
  • 简化模块导出模板,只保留了 es6 的方式

4. 使用

import Nes from './nes-client-mp.js'

const client = new Nes.Client('http://xxx.xx')

(async() => {
 await client.connect() // 可成功连接
// ... 其他操作
})()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment