Skip to content

Instantly share code, notes, and snippets.

@Jackarain
Last active June 28, 2025 07:48
Show Gist options
  • Save Jackarain/1fc67e57e1a5d0d62ddd4fefbd8f5884 to your computer and use it in GitHub Desktop.
Save Jackarain/1fc67e57e1a5d0d62ddd4fefbd8f5884 to your computer and use it in GitHub Desktop.
TcpConnection.ts 将 tcp 连接转换成能 await 读写的类,这在一些异步交互协议的读取中十分有效,从而避免 on('data') 的使用
import * as net from 'net';
// TCP 连接类,仅处理发送和接收
export class TcpConnection {
// Socket 由外部传入
private socket: net.Socket;
// 存储接收到的数据
private buffer: Buffer;
// 读取数据限制, 0 表示无限制
private readLimit: number;
// 存储接收到的数据 Promise 的解析器
private promiseResolve: ((data: Buffer) => void) | null;
// 存储接收到的数据 Promise 的拒绝器
private promiseReject: ((error: Error) => void) | null;
constructor(socket: net.Socket) {
this.socket = socket;
this.buffer = Buffer.alloc(0);
this.promiseResolve = null;
this.promiseReject = null;
this.readLimit = 0;
// 错误处理
this.socket.on('error', (err) => {
if (this.promiseReject) {
this.promiseReject(err);
this.promiseResolve = null;
this.promiseReject = null;
this.readLimit = 0;
}
});
// 连接关闭处理
this.socket.on('close', () => {
const err = new Error('Socket closed before receiving complete data');
if (this.promiseReject) {
this.promiseReject(err);
this.promiseResolve = null;
this.promiseReject = null;
this.readLimit = 0;
}
});
}
// 发送数据
public async send(data: Buffer | string): Promise<void> {
return new Promise((resolve, reject) => {
const buffer = typeof data === 'string' ? Buffer.from(data) : data;
this.socket.write(buffer, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
// 接收数据,readLimit 为接收数据最大限制,默认0无限制收到多少返回多少
public async receive(readLimit: number = 0): Promise<Buffer> {
// 说明已经有读取操作正在进行中, 同时在一个 socket 上发起多个读取操作
// 是未定义行为, 所以这里抛出异常.
if (this.socket.listenerCount('data') != 0) {
throw new Error("Read operation already in progress");
}
// 保存读取限制
this.readLimit = readLimit;
// 返回已经有数据.
if (this.buffer.length > 0) {
if (this.readLimit > 0 && this.buffer.length > this.readLimit) {
const data = this.buffer.subarray(0, this.readLimit);
this.buffer = this.buffer.subarray(this.readLimit);
return data;
}
const data = this.buffer;
this.buffer = Buffer.alloc(0);
return data;
}
// 注册数据读取消息, 若收到数据,则在 dataHandler 调用下面 Promise 的
// resolve 函数(出错调用 reject),在下面 new Promise 时保存 resolve
// 和 reject 以便在 dataHandler 接收到数据时作调用.
this.socket.once('data', this.dataHandler);
// 返回这个读取操作的 Promise 对象
return new Promise((resolve, reject) => {
// 保存数据接收的 Promise 函数.
this.promiseResolve = resolve;
this.promiseReject = reject;
});
}
// 关闭连接
public close(): void {
this.socket.end();
}
// 可写属性
public get writable(): boolean {
return this.socket.writable;
}
private dataHandler = (chunk: Buffer) => {
this.buffer = Buffer.concat([this.buffer, chunk]);
if (this.promiseResolve) {
if (this.readLimit > 0 && this.buffer.length > this.readLimit) {
const data = this.buffer.subarray(0, this.readLimit);
this.buffer = this.buffer.subarray(this.readLimit);
this.promiseResolve(data);
} else {
this.promiseResolve(this.buffer);
this.buffer = Buffer.alloc(0);
}
this.promiseResolve = null;
this.promiseReject = null;
}
};
}
/**
* Length value 协议发送函数
* 按照 "6字节长度 + 数据" 的格式封装并发送数据
* 示例 00000ahello,baby 表示长度的6字节字符串为 00000a, hello,baby 为具体数据
* 也就是说调用 sendLVData(socket, "hello,baby") 将会以上面的数据编码发送
*
* @param socket - 网络套接字
* @param data - 要发送的原始数据
*/
export async function sendLVData(socket: TcpConnection, data: Buffer | string): Promise<void> {
const length = data.length.toString(16).padStart(6, '0');
const sendBuffer = Buffer.concat([
Buffer.from(length, 'ascii'),
Buffer.from(data),
]);
return socket.send(sendBuffer);
}
/**
* Length value 协议接收函数
* 从套接字接收并解析一个 "6字节长度(16进制字符串) + 数据" 格式的数据包
* 示例 000005hello 表示长度的6字节字符串为 000005, hello 为具体数据
*
* @param socket - 网络套接字 TcpConnection 对象
* @returns - Promise,成功时解析为收到的数据Buffer
*/
export async function receiveLVData(socket: TcpConnection): Promise<Buffer> {
return new Promise(async (resolve, reject) =>{
try {
let data = Buffer.alloc(0);
do {
const buffer = await socket.receive(6 - data.length);
data = Buffer.concat([data, buffer]);
} while (data.length < 6);
const lengthHex = data.subarray(0, 6).toString('ascii');
const length = parseInt(lengthHex, 16);
while (data.length < 6 + length) {
const buffer = await socket.receive((length + 6) - data.length);
data = Buffer.concat([data, buffer]);
}
resolve(data.subarray(6, 6 + length));
} catch (error) {
reject(error);
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment