Skip to content

Instantly share code, notes, and snippets.

@baetheus
Last active November 6, 2022 18:41
Show Gist options
  • Save baetheus/c850d5beab38f5fe358ef77b5a84e980 to your computer and use it in GitHub Desktop.
Save baetheus/c850d5beab38f5fe358ef77b5a84e980 to your computer and use it in GitHub Desktop.
Quick Hack of Redis Client
import type { Either } from "fun/either.ts";
import type { Option } from "fun/option.ts";
import type { Hold } from "fun/kind.ts";
import * as A from "fun/array.ts";
import * as E from "fun/either.ts";
import * as O from "fun/option.ts";
import { flow, pipe, todo } from "fun/fn.ts";
import { BufReader, BufWriter } from "https://deno.land/[email protected]/io/mod.ts";
const encoder = new TextEncoder();
const decoder = new TextDecoder();
/**
* A try at implementing the redis RESP protocol detailed here:
* https://redis.io/docs/reference/protocol-spec/
*/
// === First Byte Consts ===
// Termination
export const CR = "\r";
export const CRLF = "\r\n";
export type CRLF = typeof CRLF;
// Simple String
export const SSTR = "+";
export type SSTR = typeof SSTR;
// Errors
export const ERRS = "-";
export type ERRS = typeof ERRS;
// Integers
export const INTS = ":";
export type INTS = typeof INTS;
// Bulk Strings
export const BSTR = "$";
export type BSTR = typeof BSTR;
// Arrays
export const ARRS = "*";
export type ARRS = typeof ARRS;
export const OK = "OK";
export type OK = typeof OK;
// === Commands ===
// String must not contain \r or \n
export type SimpleString = `${SSTR}${string}${CRLF}`;
// String must not contain \r or \n
export type ErrorString = `${ERRS}${string}${CRLF}`;
// Number must by an integer
// In some cases 0 denotes false and 1 denotes true
export type IntegerString = `${INTS}${number}${CRLF}`;
// Number is the number of bytes of the string payload
export type BulkStringLength = `${BSTR}${number}${CRLF}`;
// -1 denotes a null value, not an empty string
export type BulkNull = `${BSTR}-1${CRLF}`;
export const BulkNull = `${BSTR}-1${CRLF}` as BulkNull;
export type BulkString = `${BulkStringLength}${string}${CRLF}`;
// Number is the number of array entries
// String is a concatenation of RespMessages
export type ArrayStringLength = `${ARRS}${number}${CRLF}`;
export type ArrayString = `${ArrayStringLength}${string}`;
export type RespMessage =
| SimpleString
| ErrorString
| IntegerString
| BulkString
| BulkNull
| ArrayString;
// === Encode Resp Values ===
function removeCRLF(s: string): string {
return s.replaceAll("\n", "").replaceAll("\r", "");
}
export function simpleString(s: string): SimpleString {
return `${SSTR}${removeCRLF(s)}${CRLF}`;
}
export function errorString(s: string): ErrorString {
return `${ERRS}${removeCRLF(s)}${CRLF}`;
}
export function integerString(n: number): IntegerString {
return `${INTS}${Math.floor(n)}${CRLF}`;
}
export function bulkString(s: string | number): BulkString {
const length = encoder.encode(`${s}`).length;
return `${BSTR}${length}${CRLF}${s}${CRLF}`;
}
export function arrayString(...messages: RespMessage[]): ArrayString {
return `${ARRS}${messages.length}${CRLF}${messages.join("")}`;
}
// === Parsed Responses ===
export type StringResult = string; // Newtype?
export type ErrorResult = string; // Newtype?
export type IntegerResult = number; // Integer?
export type BulkResult = Option<string>;
export type ArrayResult = readonly ParseResult[];
export type ParseResult =
| StringResult
| ErrorResult
| IntegerResult
| BulkResult
| ArrayResult;
export type ParsedError = {
readonly tag: "ParsedError";
readonly message: string;
};
export function parsedError(message: string): ParsedError {
return { tag: "ParsedError", message };
}
export type Parsed<A extends ParseResult> = Promise<Either<ParsedError, A>>;
// Parsing a response
export function printParseResult(result: ParseResult): string {
if (typeof result === "string") {
return result;
} else if (typeof result === "number") {
return result.toString();
} else if (Array.isArray(result)) {
return result.map(printParseResult).join("\n");
} else {
return pipe(
result as BulkResult,
O.fold(() => "nil", (r) => r),
);
}
}
async function readFullLine(reader: BufReader): Promise<null | string> {
const result = await reader.readString(CR);
if (result !== null) {
// RESP uses \r\n for termination so here we
// drop the \n from the buffer
await reader.readByte();
}
// Pop the \r off the result
return result === null ? result : result.slice(0, result.length - 1);
}
const fromNull = E.fromNullable(() => parsedError("Unexpected null payload"));
// Expects that the initial RESP byte has been read from the buffer.
export async function parseString(reader: BufReader): Parsed<StringResult> {
const result = await readFullLine(reader);
return fromNull(result);
}
// Expects that the initial RESP byte has been read from the buffer.
export async function parseError(reader: BufReader): Parsed<ErrorResult> {
const result = await readFullLine(reader);
return fromNull(result);
}
// Expects that the initial RESP byte has been read from the buffer.
export async function parseInteger(reader: BufReader): Parsed<IntegerResult> {
const result = await readFullLine(reader);
return pipe(
result,
fromNull,
E.chain((str) => {
const num = parseInt(str, 10);
return isNaN(num)
? E.left(parsedError(`${str} cannot be parsed into an integer`))
: E.right(num);
}),
);
}
// Expects that the initial RESP byte has been read from the buffer.
export async function parseBulk(reader: BufReader): Parsed<BulkResult> {
const _length = await readFullLine(reader);
// Ideally we would wrap this nicely in some type classes over Promise<Either>
// But I'm trying not to yak shave too hard
if (_length === null) {
return fromNull(_length);
}
const length = parseInt(_length, 10);
if (isNaN(length)) {
return E.left(parsedError("Unable to parse length of Bulk String"));
}
// This might need to be tightened to only length === -1
if (length < 0) {
return E.right(O.none);
}
const buffer = new Uint8Array(length);
const result = await reader.readFull(buffer);
if (result === null) {
return fromNull(result);
}
// Pop the remaining \r\n off the bulk reply
const last = await readFullLine(reader);
if (last === null) {
return E.left(parsedError("Bulk string must end in CRLF"));
}
return E.right(O.some(decoder.decode(result)));
}
// Expects that the initial RESP byte has been read from the buffer.
export async function parseArray(
reader: BufReader,
): Parsed<ArrayResult> {
const _length = await readFullLine(reader);
if (_length === null) {
return fromNull(_length);
}
const length = parseInt(_length, 10);
const results: ParseResult[] = [];
let index = -1;
while (++index < length) {
const result = await parse(reader);
if (E.isLeft(result)) {
return result;
}
results[index] = result.right;
}
return E.right(results);
}
// This function does not clean up the BufReader
export async function parse(
reader: BufReader,
): Parsed<ParseResult> {
const _first = await reader.readByte();
if (_first === null) {
return fromNull(_first);
}
const first = String.fromCharCode(_first);
switch (first) {
case SSTR:
return parseString(reader);
case ERRS:
return parseError(reader);
case INTS:
return parseInteger(reader);
case BSTR:
return parseBulk(reader);
case ARRS:
return parseArray(reader);
default:
return E.left(parsedError(`Unknown RESP symbol ${first}`));
}
}
// === Connection ===
/**
* When interacting with redis our primary interface is a
* BufReader and BufWriter pair and a TcpConnection
*/
export type RedisConnection = {
readonly connection: Deno.TcpConn;
readonly reader: BufReader;
readonly writer: BufWriter;
};
/**
* For now I'm only implementing RedisConnection from a TcpConnection with
* no concept of retry or error.
*/
export function fromTcpConnection(connection: Deno.TcpConn): RedisConnection {
// TODO wrap BufReader and BufWriter in tryCatch or reduce the interface
// so it's not so big
return {
connection,
reader: new BufReader(connection),
writer: new BufWriter(connection),
};
}
const sequenceEither = A.createSequence(E.MonadEither);
/**
* Right now all of our messages are strings and not U8IntArrays. In the future
* we might want to change this for performance or interop reasons.
*
* TODO: Use AsyncEither? FnAsyncEither?
*
* TODO: Build on top of sendMessage a "Decoder" interface at
* the type level and runtime level to ensure correct responses.
*/
export async function sendMessage(
redis: RedisConnection,
message: RespMessage,
): Parsed<ParseResult>;
export async function sendMessage(
redis: RedisConnection,
message: RespMessage[],
): Parsed<ParseResult[]>;
export async function sendMessage(
redis: RedisConnection,
message: RespMessage | RespMessage[],
): Promise<Either<ParsedError, ParseResult | ParseResult[]>> {
const multi = Array.isArray(message);
const msg = Array.isArray(message) ? message.join("") : message;
await redis.writer.write(encoder.encode(msg));
await redis.writer.flush();
if (multi) {
const results: Either<ParsedError, ParseResult>[] = new Array(
message.length,
);
let index = -1;
while (++index < message.length) {
results[index] = await parse(redis.reader);
}
return sequenceEither(results);
}
return parse(redis.reader);
}
// === Basic Commands ===
export type Command<A> = Hold<A> & RespMessage;
// deno-lint-ignore no-explicit-any
export type AnyCommand = Command<any>;
export type FromCommand<T> = T extends Command<infer A> ? A : never;
// Helper
function cmd(...vs: string[]): RespMessage {
return arrayString(...vs.map(bulkString));
}
export function pipeline<C extends AnyCommand[]>(...cmds: C): C {
return cmds;
}
// APPEND
export function append(key: string, value: string): Command<number> {
return cmd("append", key, value);
}
// DECR
export function decr(key: string): Command<number> {
return cmd("decr", key);
}
// DECRBY
export function decrby(key: string, decrement: number): Command<number> {
return arrayString(
bulkString("decrby"),
bulkString(key),
integerString(decrement),
);
}
// GET
export function get(key: string): Command<Option<string>> {
return cmd("get", key);
}
// GETDEL
export function getdel(key: string): Command<Option<string>> {
return cmd("getdel", key);
}
// SET
export function set(key: string, value: string): Command<OK> {
return cmd("set", key, value);
}
// GETRANGE
// GETSET
// INCR
// INCRBY
// INCRBYFLOAT
// LCS
// MGET
// MSET
// MSETNX
// PSETEX
// SETEX
// SETNX
// SETRANGE
// STRLEN
// SUBSTR
// === Testing ===
type Send = {
(cmd: RespMessage): Parsed<ParseResult>;
(cmds: RespMessage[]): Parsed<ParseResult[]>;
};
function getSend(port: number): Send {
return async (cmd) => {
const conn = await Deno.connect({ port });
const redis = fromTcpConnection(conn);
const result = await sendMessage(redis, cmd as any);
conn.close();
return result as any;
};
}
const send = getSend(6379);
// await log(set("hello", "world"));
// await log(get("hello"));
const piped = pipeline(
get("hello"),
get("hello"),
get("hello"),
get("hello"),
get("hello"),
get("hello"),
get("hello"),
);
const result = await send(piped);
console.log(JSON.stringify(result, null, 2));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment