Last active
November 6, 2022 18:41
-
-
Save baetheus/c850d5beab38f5fe358ef77b5a84e980 to your computer and use it in GitHub Desktop.
Quick Hack of Redis Client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { Either } from "fun/either.ts"; | |
import type { Option } from "fun/option.ts"; | |
import type { Hold } from "fun/kind.ts"; | |
import * as A from "fun/array.ts"; | |
import * as E from "fun/either.ts"; | |
import * as O from "fun/option.ts"; | |
import { flow, pipe, todo } from "fun/fn.ts"; | |
import { BufReader, BufWriter } from "https://deno.land/[email protected]/io/mod.ts"; | |
const encoder = new TextEncoder(); | |
const decoder = new TextDecoder(); | |
/** | |
* A try at implementing the redis RESP protocol detailed here: | |
* https://redis.io/docs/reference/protocol-spec/ | |
*/ | |
// === First Byte Consts === | |
// Termination | |
export const CR = "\r"; | |
export const CRLF = "\r\n"; | |
export type CRLF = typeof CRLF; | |
// Simple String | |
export const SSTR = "+"; | |
export type SSTR = typeof SSTR; | |
// Errors | |
export const ERRS = "-"; | |
export type ERRS = typeof ERRS; | |
// Integers | |
export const INTS = ":"; | |
export type INTS = typeof INTS; | |
// Bulk Strings | |
export const BSTR = "$"; | |
export type BSTR = typeof BSTR; | |
// Arrays | |
export const ARRS = "*"; | |
export type ARRS = typeof ARRS; | |
export const OK = "OK"; | |
export type OK = typeof OK; | |
// === Commands === | |
// String must not contain \r or \n | |
export type SimpleString = `${SSTR}${string}${CRLF}`; | |
// String must not contain \r or \n | |
export type ErrorString = `${ERRS}${string}${CRLF}`; | |
// Number must by an integer | |
// In some cases 0 denotes false and 1 denotes true | |
export type IntegerString = `${INTS}${number}${CRLF}`; | |
// Number is the number of bytes of the string payload | |
export type BulkStringLength = `${BSTR}${number}${CRLF}`; | |
// -1 denotes a null value, not an empty string | |
export type BulkNull = `${BSTR}-1${CRLF}`; | |
export const BulkNull = `${BSTR}-1${CRLF}` as BulkNull; | |
export type BulkString = `${BulkStringLength}${string}${CRLF}`; | |
// Number is the number of array entries | |
// String is a concatenation of RespMessages | |
export type ArrayStringLength = `${ARRS}${number}${CRLF}`; | |
export type ArrayString = `${ArrayStringLength}${string}`; | |
export type RespMessage = | |
| SimpleString | |
| ErrorString | |
| IntegerString | |
| BulkString | |
| BulkNull | |
| ArrayString; | |
// === Encode Resp Values === | |
function removeCRLF(s: string): string { | |
return s.replaceAll("\n", "").replaceAll("\r", ""); | |
} | |
export function simpleString(s: string): SimpleString { | |
return `${SSTR}${removeCRLF(s)}${CRLF}`; | |
} | |
export function errorString(s: string): ErrorString { | |
return `${ERRS}${removeCRLF(s)}${CRLF}`; | |
} | |
export function integerString(n: number): IntegerString { | |
return `${INTS}${Math.floor(n)}${CRLF}`; | |
} | |
export function bulkString(s: string | number): BulkString { | |
const length = encoder.encode(`${s}`).length; | |
return `${BSTR}${length}${CRLF}${s}${CRLF}`; | |
} | |
export function arrayString(...messages: RespMessage[]): ArrayString { | |
return `${ARRS}${messages.length}${CRLF}${messages.join("")}`; | |
} | |
// === Parsed Responses === | |
export type StringResult = string; // Newtype? | |
export type ErrorResult = string; // Newtype? | |
export type IntegerResult = number; // Integer? | |
export type BulkResult = Option<string>; | |
export type ArrayResult = readonly ParseResult[]; | |
export type ParseResult = | |
| StringResult | |
| ErrorResult | |
| IntegerResult | |
| BulkResult | |
| ArrayResult; | |
export type ParsedError = { | |
readonly tag: "ParsedError"; | |
readonly message: string; | |
}; | |
export function parsedError(message: string): ParsedError { | |
return { tag: "ParsedError", message }; | |
} | |
export type Parsed<A extends ParseResult> = Promise<Either<ParsedError, A>>; | |
// Parsing a response | |
export function printParseResult(result: ParseResult): string { | |
if (typeof result === "string") { | |
return result; | |
} else if (typeof result === "number") { | |
return result.toString(); | |
} else if (Array.isArray(result)) { | |
return result.map(printParseResult).join("\n"); | |
} else { | |
return pipe( | |
result as BulkResult, | |
O.fold(() => "nil", (r) => r), | |
); | |
} | |
} | |
async function readFullLine(reader: BufReader): Promise<null | string> { | |
const result = await reader.readString(CR); | |
if (result !== null) { | |
// RESP uses \r\n for termination so here we | |
// drop the \n from the buffer | |
await reader.readByte(); | |
} | |
// Pop the \r off the result | |
return result === null ? result : result.slice(0, result.length - 1); | |
} | |
const fromNull = E.fromNullable(() => parsedError("Unexpected null payload")); | |
// Expects that the initial RESP byte has been read from the buffer. | |
export async function parseString(reader: BufReader): Parsed<StringResult> { | |
const result = await readFullLine(reader); | |
return fromNull(result); | |
} | |
// Expects that the initial RESP byte has been read from the buffer. | |
export async function parseError(reader: BufReader): Parsed<ErrorResult> { | |
const result = await readFullLine(reader); | |
return fromNull(result); | |
} | |
// Expects that the initial RESP byte has been read from the buffer. | |
export async function parseInteger(reader: BufReader): Parsed<IntegerResult> { | |
const result = await readFullLine(reader); | |
return pipe( | |
result, | |
fromNull, | |
E.chain((str) => { | |
const num = parseInt(str, 10); | |
return isNaN(num) | |
? E.left(parsedError(`${str} cannot be parsed into an integer`)) | |
: E.right(num); | |
}), | |
); | |
} | |
// Expects that the initial RESP byte has been read from the buffer. | |
export async function parseBulk(reader: BufReader): Parsed<BulkResult> { | |
const _length = await readFullLine(reader); | |
// Ideally we would wrap this nicely in some type classes over Promise<Either> | |
// But I'm trying not to yak shave too hard | |
if (_length === null) { | |
return fromNull(_length); | |
} | |
const length = parseInt(_length, 10); | |
if (isNaN(length)) { | |
return E.left(parsedError("Unable to parse length of Bulk String")); | |
} | |
// This might need to be tightened to only length === -1 | |
if (length < 0) { | |
return E.right(O.none); | |
} | |
const buffer = new Uint8Array(length); | |
const result = await reader.readFull(buffer); | |
if (result === null) { | |
return fromNull(result); | |
} | |
// Pop the remaining \r\n off the bulk reply | |
const last = await readFullLine(reader); | |
if (last === null) { | |
return E.left(parsedError("Bulk string must end in CRLF")); | |
} | |
return E.right(O.some(decoder.decode(result))); | |
} | |
// Expects that the initial RESP byte has been read from the buffer. | |
export async function parseArray( | |
reader: BufReader, | |
): Parsed<ArrayResult> { | |
const _length = await readFullLine(reader); | |
if (_length === null) { | |
return fromNull(_length); | |
} | |
const length = parseInt(_length, 10); | |
const results: ParseResult[] = []; | |
let index = -1; | |
while (++index < length) { | |
const result = await parse(reader); | |
if (E.isLeft(result)) { | |
return result; | |
} | |
results[index] = result.right; | |
} | |
return E.right(results); | |
} | |
// This function does not clean up the BufReader | |
export async function parse( | |
reader: BufReader, | |
): Parsed<ParseResult> { | |
const _first = await reader.readByte(); | |
if (_first === null) { | |
return fromNull(_first); | |
} | |
const first = String.fromCharCode(_first); | |
switch (first) { | |
case SSTR: | |
return parseString(reader); | |
case ERRS: | |
return parseError(reader); | |
case INTS: | |
return parseInteger(reader); | |
case BSTR: | |
return parseBulk(reader); | |
case ARRS: | |
return parseArray(reader); | |
default: | |
return E.left(parsedError(`Unknown RESP symbol ${first}`)); | |
} | |
} | |
// === Connection === | |
/** | |
* When interacting with redis our primary interface is a | |
* BufReader and BufWriter pair and a TcpConnection | |
*/ | |
export type RedisConnection = { | |
readonly connection: Deno.TcpConn; | |
readonly reader: BufReader; | |
readonly writer: BufWriter; | |
}; | |
/** | |
* For now I'm only implementing RedisConnection from a TcpConnection with | |
* no concept of retry or error. | |
*/ | |
export function fromTcpConnection(connection: Deno.TcpConn): RedisConnection { | |
// TODO wrap BufReader and BufWriter in tryCatch or reduce the interface | |
// so it's not so big | |
return { | |
connection, | |
reader: new BufReader(connection), | |
writer: new BufWriter(connection), | |
}; | |
} | |
const sequenceEither = A.createSequence(E.MonadEither); | |
/** | |
* Right now all of our messages are strings and not U8IntArrays. In the future | |
* we might want to change this for performance or interop reasons. | |
* | |
* TODO: Use AsyncEither? FnAsyncEither? | |
* | |
* TODO: Build on top of sendMessage a "Decoder" interface at | |
* the type level and runtime level to ensure correct responses. | |
*/ | |
export async function sendMessage( | |
redis: RedisConnection, | |
message: RespMessage, | |
): Parsed<ParseResult>; | |
export async function sendMessage( | |
redis: RedisConnection, | |
message: RespMessage[], | |
): Parsed<ParseResult[]>; | |
export async function sendMessage( | |
redis: RedisConnection, | |
message: RespMessage | RespMessage[], | |
): Promise<Either<ParsedError, ParseResult | ParseResult[]>> { | |
const multi = Array.isArray(message); | |
const msg = Array.isArray(message) ? message.join("") : message; | |
await redis.writer.write(encoder.encode(msg)); | |
await redis.writer.flush(); | |
if (multi) { | |
const results: Either<ParsedError, ParseResult>[] = new Array( | |
message.length, | |
); | |
let index = -1; | |
while (++index < message.length) { | |
results[index] = await parse(redis.reader); | |
} | |
return sequenceEither(results); | |
} | |
return parse(redis.reader); | |
} | |
// === Basic Commands === | |
export type Command<A> = Hold<A> & RespMessage; | |
// deno-lint-ignore no-explicit-any | |
export type AnyCommand = Command<any>; | |
export type FromCommand<T> = T extends Command<infer A> ? A : never; | |
// Helper | |
function cmd(...vs: string[]): RespMessage { | |
return arrayString(...vs.map(bulkString)); | |
} | |
export function pipeline<C extends AnyCommand[]>(...cmds: C): C { | |
return cmds; | |
} | |
// APPEND | |
export function append(key: string, value: string): Command<number> { | |
return cmd("append", key, value); | |
} | |
// DECR | |
export function decr(key: string): Command<number> { | |
return cmd("decr", key); | |
} | |
// DECRBY | |
export function decrby(key: string, decrement: number): Command<number> { | |
return arrayString( | |
bulkString("decrby"), | |
bulkString(key), | |
integerString(decrement), | |
); | |
} | |
// GET | |
export function get(key: string): Command<Option<string>> { | |
return cmd("get", key); | |
} | |
// GETDEL | |
export function getdel(key: string): Command<Option<string>> { | |
return cmd("getdel", key); | |
} | |
// SET | |
export function set(key: string, value: string): Command<OK> { | |
return cmd("set", key, value); | |
} | |
// GETRANGE | |
// GETSET | |
// INCR | |
// INCRBY | |
// INCRBYFLOAT | |
// LCS | |
// MGET | |
// MSET | |
// MSETNX | |
// PSETEX | |
// SETEX | |
// SETNX | |
// SETRANGE | |
// STRLEN | |
// SUBSTR | |
// === Testing === | |
type Send = { | |
(cmd: RespMessage): Parsed<ParseResult>; | |
(cmds: RespMessage[]): Parsed<ParseResult[]>; | |
}; | |
function getSend(port: number): Send { | |
return async (cmd) => { | |
const conn = await Deno.connect({ port }); | |
const redis = fromTcpConnection(conn); | |
const result = await sendMessage(redis, cmd as any); | |
conn.close(); | |
return result as any; | |
}; | |
} | |
const send = getSend(6379); | |
// await log(set("hello", "world")); | |
// await log(get("hello")); | |
const piped = pipeline( | |
get("hello"), | |
get("hello"), | |
get("hello"), | |
get("hello"), | |
get("hello"), | |
get("hello"), | |
get("hello"), | |
); | |
const result = await send(piped); | |
console.log(JSON.stringify(result, null, 2)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment