Last active
June 28, 2025 07:48
-
-
Save Jackarain/1fc67e57e1a5d0d62ddd4fefbd8f5884 to your computer and use it in GitHub Desktop.
TcpConnection.ts 将 tcp 连接转换成能 await 读写的类,这在一些异步交互协议的读取中十分有效,从而避免 on('data') 的使用
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as net from 'net'; | |
// TCP 连接类,仅处理发送和接收 | |
export class TcpConnection { | |
// Socket 由外部传入 | |
private socket: net.Socket; | |
// 存储接收到的数据 | |
private buffer: Buffer; | |
// 读取数据限制, 0 表示无限制 | |
private readLimit: number; | |
// 存储接收到的数据 Promise 的解析器 | |
private promiseResolve: ((data: Buffer) => void) | null; | |
// 存储接收到的数据 Promise 的拒绝器 | |
private promiseReject: ((error: Error) => void) | null; | |
constructor(socket: net.Socket) { | |
this.socket = socket; | |
this.buffer = Buffer.alloc(0); | |
this.promiseResolve = null; | |
this.promiseReject = null; | |
this.readLimit = 0; | |
// 错误处理 | |
this.socket.on('error', (err) => { | |
if (this.promiseReject) { | |
this.promiseReject(err); | |
this.promiseResolve = null; | |
this.promiseReject = null; | |
this.readLimit = 0; | |
} | |
}); | |
// 连接关闭处理 | |
this.socket.on('close', () => { | |
const err = new Error('Socket closed before receiving complete data'); | |
if (this.promiseReject) { | |
this.promiseReject(err); | |
this.promiseResolve = null; | |
this.promiseReject = null; | |
this.readLimit = 0; | |
} | |
}); | |
} | |
// 发送数据 | |
public async send(data: Buffer | string): Promise<void> { | |
return new Promise((resolve, reject) => { | |
const buffer = typeof data === 'string' ? Buffer.from(data) : data; | |
this.socket.write(buffer, (err) => { | |
if (err) { | |
reject(err); | |
} else { | |
resolve(); | |
} | |
}); | |
}); | |
} | |
// 接收数据,readLimit 为接收数据最大限制,默认0无限制收到多少返回多少 | |
public async receive(readLimit: number = 0): Promise<Buffer> { | |
// 说明已经有读取操作正在进行中, 同时在一个 socket 上发起多个读取操作 | |
// 是未定义行为, 所以这里抛出异常. | |
if (this.socket.listenerCount('data') != 0) { | |
throw new Error("Read operation already in progress"); | |
} | |
// 保存读取限制 | |
this.readLimit = readLimit; | |
// 返回已经有数据. | |
if (this.buffer.length > 0) { | |
if (this.readLimit > 0 && this.buffer.length > this.readLimit) { | |
const data = this.buffer.subarray(0, this.readLimit); | |
this.buffer = this.buffer.subarray(this.readLimit); | |
return data; | |
} | |
const data = this.buffer; | |
this.buffer = Buffer.alloc(0); | |
return data; | |
} | |
// 注册数据读取消息, 若收到数据,则在 dataHandler 调用下面 Promise 的 | |
// resolve 函数(出错调用 reject),在下面 new Promise 时保存 resolve | |
// 和 reject 以便在 dataHandler 接收到数据时作调用. | |
this.socket.once('data', this.dataHandler); | |
// 返回这个读取操作的 Promise 对象 | |
return new Promise((resolve, reject) => { | |
// 保存数据接收的 Promise 函数. | |
this.promiseResolve = resolve; | |
this.promiseReject = reject; | |
}); | |
} | |
// 关闭连接 | |
public close(): void { | |
this.socket.end(); | |
} | |
// 可写属性 | |
public get writable(): boolean { | |
return this.socket.writable; | |
} | |
private dataHandler = (chunk: Buffer) => { | |
this.buffer = Buffer.concat([this.buffer, chunk]); | |
if (this.promiseResolve) { | |
if (this.readLimit > 0 && this.buffer.length > this.readLimit) { | |
const data = this.buffer.subarray(0, this.readLimit); | |
this.buffer = this.buffer.subarray(this.readLimit); | |
this.promiseResolve(data); | |
} else { | |
this.promiseResolve(this.buffer); | |
this.buffer = Buffer.alloc(0); | |
} | |
this.promiseResolve = null; | |
this.promiseReject = null; | |
} | |
}; | |
} | |
/** | |
* Length value 协议发送函数 | |
* 按照 "6字节长度 + 数据" 的格式封装并发送数据 | |
* 示例 00000ahello,baby 表示长度的6字节字符串为 00000a, hello,baby 为具体数据 | |
* 也就是说调用 sendLVData(socket, "hello,baby") 将会以上面的数据编码发送 | |
* | |
* @param socket - 网络套接字 | |
* @param data - 要发送的原始数据 | |
*/ | |
export async function sendLVData(socket: TcpConnection, data: Buffer | string): Promise<void> { | |
const length = data.length.toString(16).padStart(6, '0'); | |
const sendBuffer = Buffer.concat([ | |
Buffer.from(length, 'ascii'), | |
Buffer.from(data), | |
]); | |
return socket.send(sendBuffer); | |
} | |
/** | |
* Length value 协议接收函数 | |
* 从套接字接收并解析一个 "6字节长度(16进制字符串) + 数据" 格式的数据包 | |
* 示例 000005hello 表示长度的6字节字符串为 000005, hello 为具体数据 | |
* | |
* @param socket - 网络套接字 TcpConnection 对象 | |
* @returns - Promise,成功时解析为收到的数据Buffer | |
*/ | |
export async function receiveLVData(socket: TcpConnection): Promise<Buffer> { | |
return new Promise(async (resolve, reject) =>{ | |
try { | |
let data = Buffer.alloc(0); | |
do { | |
const buffer = await socket.receive(6 - data.length); | |
data = Buffer.concat([data, buffer]); | |
} while (data.length < 6); | |
const lengthHex = data.subarray(0, 6).toString('ascii'); | |
const length = parseInt(lengthHex, 16); | |
while (data.length < 6 + length) { | |
const buffer = await socket.receive((length + 6) - data.length); | |
data = Buffer.concat([data, buffer]); | |
} | |
resolve(data.subarray(6, 6 + length)); | |
} catch (error) { | |
reject(error); | |
} | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment