Created
June 30, 2017 08:39
-
-
Save NalaGinrut/5a5d08ce7a9b6d5610f73dd269a6f242 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(use-modules (artanis server epoll) | |
(artanis utils) | |
(ice-9 rdelim) | |
(ice-9 match) | |
(ice-9 suspendable-ports) | |
(ice-9 control)) | |
(install-suspendable-ports!) | |
(define (call-with-sigint x y) (%call-with-sigint x y)) | |
(define %call-with-sigint | |
(if (not (provided? 'posix)) | |
(lambda (thunk handler-thunk) (thunk)) | |
(lambda (thunk handler-thunk) | |
(let ((handler #f)) | |
(catch 'interrupt | |
(lambda () | |
(dynamic-wind | |
(lambda () | |
(set! handler | |
(sigaction SIGINT (lambda (sig) (throw 'interrupt))))) | |
thunk | |
(lambda () | |
(if handler | |
;; restore Scheme handler, SIG_IGN or SIG_DFL. | |
(sigaction SIGINT (car handler) (cdr handler)) | |
;; restore original C handler. | |
(sigaction SIGINT #f))))) | |
(lambda (k . _) (handler-thunk))))))) | |
(define *error-event* (logior EPOLLRDHUP EPOLLHUP)) | |
(define *read-event* EPOLLIN) | |
(define (gen-read-event) (logior *read-event* EPOLLET)) | |
(define *rw-event* (logior EPOLLIN EPOLLOUT)) | |
(define (gen-rw-event) (logior *error-event* EPOLLET *rw-event*)) | |
(define *write-event* EPOLLOUT) | |
(define (gen-write-event) (logior *error-event* EPOLLET *write-event*)) | |
(define *work-table* (make-hash-table)) | |
(define *event-set* (make-epoll-event-set)) | |
(define epfd (epoll-create1 0)) | |
(define listen-fd) | |
(define listen-socket) | |
(define (async-read-waiter port) | |
(display "Async read!\n") | |
;;(format #t "would break ~a~%" port) | |
(abort-to-prompt 'serve-one-request (port->fdes port))) | |
(define (async-write-waiter port) | |
(display "Async write!\n") | |
;;(format #t "would break ~a~%" port) | |
(abort-to-prompt 'serve-one-request (port->fdes port))) | |
(define (make-listen-port family addr port) | |
(let ((sock (socket family SOCK_STREAM 0))) | |
;; Causes the port to be released immediately after the socket is closed. | |
(setsockopt sock SOL_SOCKET SO_REUSEADDR 1) | |
(fcntl sock F_SETFL (logior O_NONBLOCK (fcntl sock F_GETFL 0))) | |
(bind sock family addr port) | |
(listen sock 5) | |
(sigaction SIGPIPE SIG_IGN) | |
sock)) | |
(define (multicast-msg msg) | |
(hash-for-each | |
(lambda (fd _) | |
(let ((port (car (fdes->ports fd)))) | |
(display msg port) | |
(force-output port))) | |
*work-table*)) | |
(define (register-connecting-socket conn-port) | |
(format #t "register ~a as RW event~%" conn-port) | |
(epoll-ctl epfd EPOLL_CTL_ADD (port->fdes conn-port) | |
(make-epoll-event (port->fdes conn-port) (gen-rw-event)))) | |
(define (socket-is-connected? sock) | |
(let ((e (getsockopt sock SOL_SOCKET SO_ERROR))) | |
(format #t "SO_ERROR is ~a~%" e) | |
(zero? e))) | |
(define (release-connecting-socket reason username conn-port) | |
(format #t "releasing ~a because of ~a~%" conn-port reason) | |
(when (= (port->fdes listen-socket) (port->fdes conn-port)) | |
(error "BUG here!")) | |
(epoll-ctl epfd EPOLL_CTL_DEL (port->fdes conn-port) #f) | |
(hash-remove! *work-table* (port->fdes conn-port)) | |
(when (eq? reason 'quit) | |
(format conn-port "Goodbye ~a~%~!" (or username "noname"))) | |
#; | |
(when (not (port-closed? conn-port)) | |
(cond | |
((socket-is-connected? conn-port) | |
(format #t "~a is still connected, shutdown it!~%" conn-port) | |
(shutdown conn-port 2)) | |
(else | |
(format #t "~a has been disconnected!~%" conn-port))) | |
) | |
(format #t "Now closing ~a~%" conn-port) | |
;;(when (not (port-closed? conn-port)) (close-port conn-port)) | |
(shutdown conn-port 2) | |
) | |
;; client-connection is conn-socket | |
(define (read-from-1234 client-connection) | |
(define username #f) | |
(catch | |
#t | |
(lambda () | |
(display "start now!\n") | |
(let ((client-details (cdr client-connection)) | |
(conn-port (car client-connection))) | |
(register-connecting-socket conn-port) | |
(format #t "client-connection: ~a~%" client-connection) | |
(setsockopt conn-port SOL_SOCKET SO_KEEPALIVE 1) | |
;;(setsockopt conn-port SOL_SOCKET SO_REUSEADDR 1) | |
(fcntl conn-port F_SETFL (logior O_NONBLOCK (fcntl conn-port F_GETFL 0))) | |
(setvbuf conn-port 'block) | |
(setsockopt conn-port SOL_SOCKET SO_SNDBUF (* 12 1024)) | |
(format #t "Got new client connection: ~a~%" client-details) | |
(format conn-port "Hello guest, please type your name~%~!") | |
(let lp () | |
(cond | |
((or (port-closed? conn-port) | |
(not (socket-is-connected? conn-port)) | |
(eof-object? (peek-char conn-port))) | |
(release-connecting-socket 'quit username conn-port)) | |
((not username) | |
(let ((name (string-trim-both (read-line conn-port) | |
(lambda (c) (member c '(#\sp #\return)))))) | |
(cond | |
((string-null? name) | |
(format conn-port "On come on! Give me your name!~%~!") | |
(lp)) | |
(else | |
(set! username name) | |
(format conn-port "Hello ~a, there're ~a people here~%~!" username (hash-count (const #t) *work-table*)) | |
(lp))))) | |
(else | |
(let* ((str (read-line conn-port)) | |
(msg (format #f "~a: ~a~%" username str))) | |
(display msg) | |
(multicast-msg msg) | |
(lp))))))) | |
(lambda e | |
(format #t "Exception: ~a~%" e) | |
(release-connecting-socket 'exception username (car client-connection))))) | |
(define (go) | |
(let* ((s (make-listen-port AF_INET 0 #;INADDR_LOOPBACK 1234 | |
)) | |
(listen-event (make-epoll-event (port->fdes s) (gen-read-event)))) | |
(sigaction SIGPIPE SIG_IGN) ; FIXME: should we remove the related threads? | |
(epoll-ctl epfd EPOLL_CTL_ADD (port->fdes s) listen-event) | |
(set! listen-socket s) | |
(catch | |
#t | |
(lambda () | |
(try-it)) | |
(lambda e | |
(format #t "General error happened!~%~!") | |
(format #t "~a~%" e))))) | |
(define (run-task task) | |
(call-with-prompt | |
'serve-one-request | |
(lambda () | |
(format #t "now we run this task: ~a~%" task) | |
(task)) | |
(lambda (k fd) | |
(format #t "task ~a said: I need schedule!~%" fd) | |
(hash-set! *work-table* fd k)))) | |
(define (serve-one-request client) | |
(define (continuatble-work?) | |
(cond | |
((and (pair? client) (number? (car client))) ; this type is generated from epoll | |
(hash-ref *work-table* (car client))) | |
(else (error continuatble-work? "Wrong client" client)))) | |
(format #t "wt: ~a~%" (hash-map->list cons *work-table*)) | |
(format #t "Now get client ~a~%" client) | |
(cond | |
((continuatble-work?) | |
;;=> run-task | |
=> (lambda (task) | |
(format #t "let's continue ~a~%" client) | |
(run-task task))) | |
(else | |
(format #t "new task ~a~%" client) | |
(let* ((conn-port (accept (car (fdes->ports (car client))))) | |
(new-task (lambda () | |
(read-from-1234 conn-port)))) | |
(setvbuf (car conn-port) 'block) | |
(setsockopt (car conn-port) SOL_SOCKET SO_SNDBUF (* 12 1024)) | |
(run-task new-task))))) | |
(define-public (is-peer-shutdown? e) | |
(not (zero? (logand (cdr e) EPOLLRDHUP)))) | |
(define (try-it) | |
(display "enter!\n") | |
(format #t "listen socket is ~a~%" listen-socket) | |
(format #t "is-closed? (~a)~%" (port-closed? listen-socket)) | |
(display "success!\n") | |
(parameterize ((current-read-waiter async-read-waiter) | |
(current-write-waiter async-write-waiter)) | |
(let ((el (epoll-wait epfd *event-set* 3000))) | |
(format #t "Waiting for request...now there's ~a requests~%" (length el)) | |
(format #t "wt: ~a~%" (hash-map->list cons *work-table*)) | |
(for-each | |
(lambda (e) | |
(call-with-sigint | |
(lambda () | |
(cond | |
((is-peer-shutdown? e) | |
(release-connecting-socket 'peer-shutdown "unknown" (car (fdes->ports (car e))))) | |
(else (serve-one-request e)))) | |
(lambda () | |
(format #t "I'm interrupted ~a!~%" e) | |
(release-connecting-socket 'interrupt "unknown" (car (fdes->ports (car e))))))) | |
el) | |
(try-it)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment