Created
November 10, 2016 22:06
-
-
Save xlab/7545e514b4a74aeea7128f7ca8ed0207 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2014 The Go Authors. All rights reserved. | |
// Use of this source code is governed by a BSD-style | |
// license that can be found in the LICENSE file. | |
type stopPumping struct{} | |
// pump returns a channel src such that sending on src will eventually send on | |
// dst, in order, but that src will always be ready to send/receive soon, even | |
// if dst currently isn't. It is effectively an infinitely buffered channel. | |
// | |
// In particular, goroutine A sending on src will not deadlock even if goroutine | |
// B that's responsible for receiving on dst is currently blocked trying to | |
// send to A on a separate channel. | |
// | |
// Send a stopPumping on the src channel to close the dst channel after all queued | |
// events are sent on dst. After that, other goroutines can still send to src, | |
// so that such sends won't block forever, but such events will be ignored. | |
func pump(dst chan interface{}) (src chan interface{}) { | |
src = make(chan interface{}) | |
go func() { | |
// initialSize is the initial size of the circular buffer. It must be a | |
// power of 2. | |
const initialSize = 16 | |
i, j, buf, mask := 0, 0, make([]interface{}, initialSize), initialSize-1 | |
maybeSrc := src | |
for { | |
maybeDst := dst | |
if i == j { | |
maybeDst = nil | |
} | |
if maybeDst == nil && maybeSrc == nil { | |
break | |
} | |
select { | |
case maybeDst <- buf[i&mask]: | |
buf[i&mask] = nil | |
i++ | |
case e := <-maybeSrc: | |
if _, ok := e.(stopPumping); ok { | |
maybeSrc = nil | |
continue | |
} | |
// Allocate a bigger buffer if necessary. | |
if i+len(buf) == j { | |
b := make([]interface{}, 2*len(buf)) | |
n := copy(b, buf[j&mask:]) | |
copy(b[n:], buf[:j&mask]) | |
i, j = 0, len(buf) | |
buf, mask = b, len(b)-1 | |
} | |
buf[j&mask] = e | |
j++ | |
} | |
} | |
close(dst) | |
// Block forever. | |
for range src { | |
} | |
}() | |
return src | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment