Last active
October 31, 2021 18:49
-
-
Save ignoramous/f6ad24214e52c4ba2f28d366810344d1 to your computer and use it in GitHub Desktop.
adaptive lifo ring buffer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// SPDX-License-Identifier: CC0-1.0 | |
// constant-time ring-buffer: https://news.ycombinator.com/item?id=14106577 | |
class ReqQueue { | |
constructor(cap, fillerFn, serverFn, cleanerFn) { | |
this.capacity = cap | |
// a ring buffer | |
this.rb = new Array(this.capacity) | |
this.rb.fill(null) | |
// fillIdx points to the current empty accept slot | |
this.fillIdx = this.cur(0) | |
// cleanIdx points to the yet-to-be cleaned slot | |
// always trails fillIdx | |
this.cleanIdx = this.cur(0) | |
// filler creates a req | |
this.filler = fillerFn | |
// servers accepts a req | |
this.server = serverFn | |
// cleaner cleans-up a req | |
this.cleaner = cleanerFn | |
// notes req buffer last empty time | |
this.emptyTime = Date.now() | |
} | |
rewind() { | |
const p = this.rb | |
this.rb = new Array(this.capacity) | |
this.rb.fill(null) | |
this.fillIdx = this.cur(0) | |
this.cleanIdx = this.cur(0) | |
this.emptyTime = Date.now() | |
return p | |
} | |
next(i) { | |
return (this.capacity + i + 1) % this.capacity | |
} | |
cur(i) { | |
return (this.capacity + i) % this.capacity | |
} | |
prev(i) { | |
return (this.capacity + i - 1) % this.capacity | |
} | |
get head() { | |
return this.cur(this.fillIdx) | |
} | |
get tail() { | |
return this.cur(this.cleanIdx) | |
} | |
incrTail() { | |
this.cleanIdx = this.next(this.cleanIdx) | |
} | |
incrHead() { | |
this.fillIdx = this.next(this.fillIdx) | |
} | |
decrHead() { | |
this.fillIdx = this.prev(this.fillIdx) | |
} | |
get full() { | |
return this.size === this.capacity | |
} | |
get empty() { | |
return this.size === 0 | |
} | |
get latestEmptyTime() { | |
if this.empty { | |
this.emptyTime = Date.now() | |
} | |
return this.emptyTime | |
} | |
get size() { | |
const h = this.head // current empty accept slot | |
const t = this.tail // current cleaned slot | |
const c = this.capacity // full | |
const z = 0 // empty | |
if (h == t) { | |
// if h and t overlap, and the current slot is | |
// not empty (null), then rb is full, as h has | |
// essentially wrapped around and caught up with | |
// t, which hasn't cleaned up anything yet. but: | |
// if the current slot is empty (null), then t | |
// has cleaned up everything and caught up with h | |
// making rb empty | |
return (this.rb[h] === null) ? z : c | |
} else if (h > t) { | |
return h - t | |
} else { | |
return c + h - t | |
} | |
} | |
get pop() { | |
if (this.empty) { | |
this.emptyTime = Date.now() | |
return null | |
} | |
const out = this.rb[this.tail] | |
this.rb[this.tail] = null | |
this.incrTail() | |
return out | |
} | |
get rpop() { | |
if (this.empty) { | |
this.emptyTime = Date.now() | |
return null | |
} | |
this.decrHead() | |
const out = this.rb[this.head] | |
this.rb[this.head] = null | |
return out | |
} | |
evict() { | |
logd("evict, head/tail/size", this.head, this.tail, this.size) | |
const out = this.pop | |
if (out === null) return | |
this.cleaner(out) | |
} | |
fifo() { | |
const first = this.pop | |
if (first === null) return | |
logd("fifo, head/tail/size", this.head, this.tail, this.size) | |
this.server(first) | |
} | |
lifo() { | |
const last = this.rpop | |
if (last === null) return | |
logd("lifo, head/tail/size", this.head, this.tail, this.size) | |
this.server(last) | |
} | |
push(d) { | |
if (this.full) { | |
this.evict() | |
} | |
this.rb[this.head] = this.filler(this, d) | |
this.incrHead() | |
} | |
get highload() { | |
const cap75 = this.capacity * 0.75 | |
return this.size > cap75 | |
} | |
poll() { | |
// adaptive lifo: https://queue.acm.org/detail.cfm?id=2839461 | |
if (this.highload) { | |
return this.lifo() | |
} else { | |
return this.fifo() | |
} | |
} | |
drain() { | |
for (let r of this.rewind()) if (r) this.cleaner(r) | |
} | |
} | |
function shape(queue, conn) { | |
// controlled delay: https://queue.acm.org/detail.cfm?id=2839461 | |
const now = Date.now() | |
let timeout = upperMs | |
if (queue.latestEmptyTime < now - upperMs) { | |
timeout = lowerMs | |
} | |
const req = {conn: conn, ttl: (now + timeout)} | |
logd("shaping", req, "timeout", timeout) | |
return req | |
} | |
function accept(conn) { | |
logd("accepting", conn) | |
// serve conn | |
} | |
function serve(req) { | |
const now = Date.now() | |
if (now > req.ttl) return close(req) | |
return accept(req.conn) | |
} | |
function close(req) { | |
logd("closing", req) | |
// close req.conn | |
} | |
function logd() { | |
if (debug) console.debug(...arguments) | |
} | |
let debug = false | |
const lowerMs = 500 | |
const upperMs = 5000 | |
function naivetest() { | |
const rq = new ReqQueue(/*cap*/ 100, shape, serve, close) | |
for (let i = 1; i < 1000; i++) { | |
rq.push(i) | |
// every 5th request is *not* polled | |
if (i % 5) rq.poll() | |
} | |
logd("head/tail/size/load", rq.head, rq.tail, rq.size, rq.highload) | |
rq.drain() | |
logd("fin head/tail/size/load", rq.head, rq.tail, rq.size, rq.highload) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment