Last active
May 26, 2020 11:36
-
-
Save lexi-lambda/c54c91867f931b56123e3c595d8e445a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#lang racket/base | |
; Lock-free FIFO queues, loosely based on “Simple, Fast, and Practical | |
; Non-Blocking and Blocking Concurrent Queue Algorithms”. | |
(require racket/contract | |
racket/future | |
(only-in racket/unsafe/ops unsafe-struct*-cas!)) | |
(provide queue? | |
make-queue | |
(contract-out [enqueue! (-> queue? any/c void?)] | |
[dequeue! (-> queue? any/c)] | |
[try-dequeue! (-> queue? any/c)] | |
[queue-length (-> queue? exact-nonnegative-integer?)])) | |
; A queue is two pointers into a mutable, singly-linked list of nodes: | |
; head: Points at the first node in the list. | |
; tail: Usually points at the last node in the list, but briefly points at the | |
; second-to-last node during an enqueue! operation. | |
; It also contains a future semaphore, which allows dequeue! to block on an | |
; empty queue (and doubles as a length counter). | |
; | |
; Note that head and tail are *always* nodes, even if the queue is empty. To | |
; maintain this invariant, head does not actually contain the first value in the | |
; queue. Rather, the first value in the queue is (node-value (node-next head)). | |
; This allows enqueue! to only touch the tail and dequeue! to only touch the | |
; head, avoiding a need for synchronization between modifications to both fields. | |
(struct queue ([head #:mutable] [tail #:mutable] semaphore) #:authentic) | |
; Mutable pairs a la mcons with support for atomic updates. | |
(struct node ([value #:mutable] [next #:mutable]) #:authentic) | |
; It would be cool if #:authentic generated these for mutable fields. | |
(define (queue-head-cas! q old new) | |
(unless (queue? q) (raise-argument-error 'queue-head-cas! "queue?" q)) | |
(unsafe-struct*-cas! q 0 old new)) | |
(define (queue-tail-cas! q old new) | |
(unless (queue? q) (raise-argument-error 'queue-tail-cas! "queue?" q)) | |
(unsafe-struct*-cas! q 1 old new)) | |
(define (node-next-cas! n old new) | |
(unless (node? n) (raise-argument-error 'node-next-cas! "node?" n)) | |
(unsafe-struct*-cas! n 1 old new)) | |
(define (make-queue) | |
(define n (node #f #f)) | |
(queue n n (make-fsemaphore 0))) | |
(define (queue-length q) | |
(fsemaphore-count (queue-semaphore q))) | |
(define (enqueue! q v) | |
(define new (node v #f)) | |
(let retry () | |
(define old (queue-tail q)) | |
(cond | |
; If (node-next old) is #f, then it is the end of the node list, so we can | |
; just insert our new node there. | |
[(node-next-cas! old #f new) | |
; We update tail via CAS because concurrent enqueue! operations will | |
; advance it themselves if it falls behind so they don’t have to wait for | |
; us. By the time we get a turn, tail may have been advanced past our | |
; node, so we want to just return. | |
(queue-tail-cas! q old new) | |
(fsemaphore-post (queue-semaphore q))] | |
[else | |
; Another enqueue! beat us; ensure the tail is updated and try again. | |
(queue-tail-cas! q old (node-next old)) | |
(retry)]))) | |
(define (dequeue! q) | |
(fsemaphore-wait (queue-semaphore q)) | |
(do-dequeue! q)) | |
(define (try-dequeue! q) | |
(and (fsemaphore-try-wait? (queue-semaphore q)) | |
(do-dequeue! q))) | |
; Precondition: q is non-empty. | |
(define (do-dequeue! q) | |
(let retry () | |
(define old (queue-head q)) | |
(define new (node-next old)) | |
(cond | |
[(queue-head-cas! q old new) | |
; Return the dequeued value, setting the value of the new head to #f | |
; to preserve invariant F. This ensures we don’t needlessly keep the | |
; GC from collecting the value. | |
(define value (node-value new)) | |
(set-node-value! new #f) | |
value] | |
[else | |
; Another dequeue! beat us; try again. | |
(retry)]))) | |
(module* test racket/base | |
(require racket/future rackunit (submod "..")) | |
(define (box-add! b n) | |
(let retry () | |
(define old (unbox b)) | |
(unless (box-cas! b old (+ old n)) | |
(retry)))) | |
(define (run-test-configuration #:futures futures | |
#:elements elements | |
#:iterations iterations) | |
(with-check-info (['futures futures] | |
['elements elements] | |
['iterations iterations]) | |
(test-begin | |
(define queue (make-queue)) | |
(define total (box 0)) | |
(define producers | |
(for/list ([i (in-range futures)]) | |
(future (λ () (for ([j (in-range elements)]) | |
(enqueue! queue (for/sum ([k (in-range iterations)]) | |
(+ (* elements iterations i) | |
(* iterations j) | |
k)))))))) | |
(define consumers | |
(for/list ([i (in-range futures)]) | |
(future (λ () (for ([j (in-range elements)]) | |
(box-add! total (dequeue! queue))))))) | |
(define forcers | |
(for/list ([f (in-sequences producers consumers)]) | |
(thread (λ () (touch f))))) | |
(for-each thread-wait forcers) | |
(check-equal? (unbox total) (for/sum ([i (* futures elements iterations)]) i))))) | |
(run-test-configuration #:futures 10 #:elements 1000 #:iterations 1000) | |
(run-test-configuration #:futures 3 #:elements 10000 #:iterations 500) | |
(run-test-configuration #:futures 50 #:elements 1000 #:iterations 200)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment