Last active
August 29, 2015 13:56
-
-
Save ktakashi/9150137 to your computer and use it in GitHub Desktop.
Piped port using thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!r6rs | |
(library (port-pipe) | |
(export call-with-port-pipe) | |
(import (rnrs) (rnrs mutable-pairs) | |
(srfi :18)) | |
(define (make-queue) | |
(cons '() '())) | |
(define (queue-empty? queue) | |
(null? (car queue))) | |
(define (queue-front queue) | |
(if (queue-empty? queue) (eof-object) (caar queue))) | |
(define (enqueue! queue item) | |
(let ((n (cons item '()))) | |
(if (queue-empty? queue) | |
(begin (set-car! queue n) (set-cdr! queue n)) | |
(let ((rear-pair (cdr queue))) | |
(set-cdr! rear-pair n) | |
(set-cdr! queue n))) | |
queue)) | |
(define (dequeue! queue) | |
(if (queue-empty? queue) | |
(eof-object) | |
(let* ((top (car queue)) | |
(item (car top))) | |
(if (null? (cdr top)) | |
(begin (set-car! queue '()) | |
(set-cdr! queue '())) | |
(set-car! queue (cdr top))) | |
item))) | |
(define-syntax inc! | |
(syntax-rules () | |
((_ var) (inc! var 1)) | |
((_ var p) (set! var (+ var p))))) | |
(define-syntax dec! | |
(syntax-rules () | |
((_ var) (dec! var 1)) | |
((_ var p) (set! var (- var p))))) | |
(define (call-with-port-pipe productor consumer) | |
(let* ((chunks (make-queue)) | |
(pipe-write-port-closed? #f) | |
(position 0) | |
(buffer-rest 0) | |
(mutex (make-mutex))) | |
(define (pipe-read! bv start count) | |
(cond ((zero? count) 0) | |
((< buffer-rest count) | |
(if pipe-write-port-closed? | |
(pipe-read! bv start buffer-rest) | |
(begin (mutex-lock! mutex) | |
(let ((r (pipe-read! bv start count))) | |
(mutex-unlock! mutex) | |
r)))) | |
(else | |
(let* ((chunk (queue-front chunks)) | |
(chunk-rest (- (bytevector-length chunk) position))) | |
(cond ((< count chunk-rest) | |
(bytevector-copy! chunk position bv start count) | |
(dec! buffer-rest count) | |
(inc! position count) | |
count) | |
((= count chunk-rest) | |
(bytevector-copy! chunk position bv start chunk-rest) | |
(dec! buffer-rest chunk-rest) | |
(set! position 0) | |
(dequeue! chunks) | |
chunk-rest) | |
((> count chunk-rest) | |
(bytevector-copy! chunk position bv start chunk-rest) | |
(dec! buffer-rest chunk-rest) | |
(set! position 0) | |
(dequeue! chunks) | |
(+ chunk-rest | |
(pipe-read! bv | |
(+ start chunk-rest) | |
(- count chunk-rest))))))))) | |
(define (pipe-write! bv start count) | |
(let ((nbv (make-bytevector count))) | |
(bytevector-copy! bv start nbv 0 count) | |
(mutex-lock! mutex) | |
(enqueue! chunks nbv) | |
(inc! buffer-rest count) | |
(mutex-unlock! mutex) | |
count)) | |
(define (pipe-close) | |
(set! pipe-write-port-closed? #t)) | |
(let* ((out (make-custom-binary-output-port "productor" pipe-write! | |
#f #f pipe-close)) | |
(in (make-custom-binary-input-port "consumer" pipe-read! #f #f #f)) | |
(thread-p (make-thread | |
(lambda () (productor out) (close-port out)))) | |
(thread-c (make-thread (lambda () (consumer in))))) | |
(thread-start! thread-p) | |
(thread-start! thread-c) | |
(values (thread-join! thread-p) (thread-join! thread-c)) | |
))) | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!r6rs | |
(import (rnrs) | |
(port-pipe)) | |
(call-with-port-pipe | |
(lambda(out) | |
(put-u8 out 15) | |
(put-u8 out 2) | |
(put-u8 out 5)) | |
(lambda(in) | |
(display (get-bytevector-all in)) | |
(display "end"))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment