Created
July 29, 2018 11:12
-
-
Save filinvadim/33ee5a348f9353cb2fcf71e7338388ee to your computer and use it in GitHub Desktop.
golang elastic channel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) 2014, Nick Patavalis ([email protected]). | |
// All rights reserved. | |
// Use of this source code is governed by a BSD-style license that can | |
// be found in the LICENSE file. | |
// Package elastic demonstrates a simple implementation of elastic | |
// (growable) buffers for channels. See: | |
// https://github.com/npat-efault/musings/wiki/Elastic-channels | |
const ( | |
// Maximum allowed queue size (practically unlimited). | |
maxQSz = 0x40000000 | |
// Number of items to receive form the input channel | |
// consecutively. | |
maxReceive = 1024 | |
) | |
// ElasticT is an elastic channel of T-typed elements. | |
type ElasticT struct { | |
S chan<- interface{} // Send direction. | |
R <-chan interface{} // Receive direction. | |
} | |
// NewElastic creates and returns a new elastic channel | |
func NewElastic(bufSize int) ElasticT { | |
cin := make(chan interface{}, bufSize) | |
cout := make(chan interface{}, bufSize) | |
e := ElasticT{S: cin, R: cout} | |
go elasticRun(cout, cin) | |
return e | |
} | |
// elasticRun tries to flush the input channel (cin) and the internal queue | |
// before returning back to the select statement. It takes advantage | |
// of the fact that select statements seem to have considerable | |
// overhead over single-channel reveive / select statements. | |
func elasticRun(cout chan<- interface{}, cin <-chan interface{}) { | |
var in <-chan interface{} | |
var out chan<- interface{} | |
var vi, vo interface{} | |
var ok bool | |
q := newCQT(1, maxQSz) | |
in, out = cin, nil | |
for { | |
select { | |
case vi, ok = <-in: | |
inLoop: | |
for i := 0; i < maxReceive; i++ { | |
if !ok { | |
if out == nil { | |
close(cout) | |
return | |
} | |
in = nil | |
break | |
} | |
if out == nil { | |
vo = vi | |
out = cout | |
} else { | |
q.PushBack(vi) | |
} | |
select { | |
case vi, ok = <-in: | |
default: | |
break inLoop | |
} | |
} | |
case out <- vo: | |
outLoop: | |
for { | |
vo, ok = q.PopFront() | |
if !ok { | |
if in == nil { | |
close(cout) | |
return | |
} | |
out = nil | |
} | |
if q.Len() < q.Cap()>>1 { | |
q.Compact(1) | |
} | |
select { | |
case out <- vo: | |
default: | |
break outLoop | |
} | |
} | |
} | |
} | |
} | |
/ Auto-generated. !! DO NOT EDIT !! | |
// Copyright (c) 2014, Nick Patavalis ([email protected]). | |
// All rights reserved. | |
// Use of this source code is governed by a BSD-style license that can | |
// be found in the LICENSE file. | |
// cQT is a circular queue. | |
// | |
// It is implemented with a slice and free running indexes. It starts | |
// with a user specified initial size (which must be a power of 2) and | |
// grows exponentially (doubles in size), when required, to accomodate | |
// more elements (up to a user specified maximum size). | |
// | |
// Queue operations are *NOT* thread safe. | |
type cQT struct { | |
sz uint32 /* current queue size */ | |
maxSz uint32 /* max queue size */ | |
m uint32 /* queue mask (sz - 1) */ | |
s uint32 /* start index */ | |
e uint32 /* end index */ | |
b []interface{} /* buffer */ | |
} | |
// newCQT creates and returns a new circular queue. | |
// | |
// The queue is initially allocated with space for sz elements. It can | |
// grow, when required, to accomodate up to maxSz elements. Both sz | |
// and maxSz *must* be powers of 2. | |
func newCQT(sz, maxSz int) *cQT { | |
if sz <= 0 || uint32(sz)&(uint32(sz)-1) != 0 || | |
uint32(maxSz)&(uint32(maxSz)-1) != 0 || | |
maxSz < sz { | |
panic("Invalid Q size") | |
} | |
cq := &cQT{ | |
sz: uint32(sz), maxSz: uint32(maxSz), | |
m: uint32(sz) - 1, | |
s: 0, e: 0, | |
} | |
cq.b = make([]interface{}, sz) | |
return cq | |
} | |
// Empty tests if the queue is empty. | |
func (cq *cQT) Empty() bool { | |
return cq.s == cq.e | |
} | |
// Full tests if the queue is full. | |
func (cq *cQT) Full() bool { | |
return cq.e-cq.s == cq.maxSz | |
} | |
// Len returns the number of elements waiting in the queue. | |
func (cq *cQT) Len() int { | |
return int(cq.e - cq.s) | |
} | |
// Cap returns the capacity of the queue (# of element slots currently | |
// allocated). | |
func (cq *cQT) Cap() int { | |
return int(cq.sz) | |
} | |
// MaxCap returns the maximum capacity of the queue (max # of element | |
// allowed). | |
func (cq *cQT) MaxCap() int { | |
return int(cq.maxSz) | |
} | |
// PeekFront returns the front (head) element of the queue, without | |
// removing it. Returns ok == false if the list is empty (unable to | |
// peek element), ok == true otherwise. | |
func (cq *cQT) PeekFront() (el interface{}, ok bool) { | |
if cq.s == cq.e { | |
return el, false | |
} | |
return cq.b[cq.s&cq.m], true | |
} | |
// PeekBack returns the back (tail) element of the queue, without | |
// removing it. Returns ok == false if the list is empty (unable to | |
// peek element), ok == true otherwise. | |
func (cq *cQT) PeekBack() (el interface{}, ok bool) { | |
if cq.s == cq.e { | |
return el, false | |
} | |
return cq.b[(cq.e-1)&cq.m], true | |
} | |
// PopHead removes the front (head) element from the queue and returns | |
// it. Returns ok == false if the list was empty (unable to pop | |
// element), ok == true otherwise. | |
func (cq *cQT) PopFront() (el interface{}, ok bool) { | |
var zero itemOnDelete | |
if cq.s == cq.e { | |
return zero, false | |
} | |
el = cq.b[cq.s&cq.m] | |
cq.b[cq.s&cq.m] = zero | |
cq.s++ | |
return el, true | |
} | |
// PopBack removes the back (tail) element from the queue and returns | |
// it. Returns ok == false if the list was empty (unable to pop | |
// elemnt), ok == true otherwise. | |
func (cq *cQT) PopBack() (el interface{}, ok bool) { | |
var zero itemOnDelete | |
if cq.s == cq.e { | |
return zero, false | |
} | |
cq.e-- | |
el = cq.b[cq.e&cq.m] | |
cq.b[cq.e&cq.m] = zero | |
return el, true | |
} | |
// PushBack adds element "el" to the back (tail) of the queue. Returns | |
// ok == false if the list was full (unable to push element), ok == | |
// true otherwise. | |
func (cq *cQT) PushBack(el interface{}) (ok bool) { | |
if cq.e-cq.s == cq.sz { | |
if cq.sz == cq.maxSz { | |
return false | |
} | |
cq.resize(cq.sz << 1) | |
} | |
cq.b[cq.e&cq.m] = el | |
cq.e++ | |
return true | |
} | |
// PushFront adds element "e" to the front (head) of the queue. Returns | |
// ok == false if the list was full (unable to push element), ok == | |
// true otherwise. | |
func (cq *cQT) PushFront(el interface{}) (ok bool) { | |
if cq.e-cq.s == cq.sz { | |
if cq.sz == cq.maxSz { | |
return false | |
} | |
cq.resize(cq.sz << 1) | |
} | |
cq.s-- | |
cq.b[cq.s&cq.m] = el | |
return true | |
} | |
// roundUp2 rounds v up to the nearest power of 2 | |
func roundUp2(v uint32) uint32 { | |
if v == 0 { | |
return 1 | |
} | |
v-- | |
v |= v >> 1 | |
v |= v >> 2 | |
v |= v >> 4 | |
v |= v >> 8 | |
v |= v >> 16 | |
v++ | |
return v | |
} | |
// Compact resizes the queue slice (without removing elements from the | |
// queue) to the smallest possible size, but not smaller than | |
// sz. Argument sz *must* be a power of 2. In effect, Compact changes | |
// the current size of the queue slice to the smalest possible size | |
// nSz that satisfies all three: (1) nSz is a power of 2, (2) nSz >= | |
// cq.Len(), (3) nSz >= sz. Compact does not affect the capacity | |
// (maxSz) of the queue. | |
func (cq *cQT) Compact(sz int) { | |
if sz < 0 || uint32(sz) > cq.maxSz || uint32(sz)&(uint32(sz-1)) != 0 { | |
panic("Compact Q with invalid size") | |
} | |
nSz := roundUp2(cq.e - cq.s) | |
if nSz < uint32(sz) { | |
nSz = uint32(sz) | |
} | |
if nSz == cq.sz { | |
return | |
} | |
cq.resize(nSz) | |
} | |
// resize, resizes the queue to size sz. The caller *must* make sure | |
// than sz satisfies all three: (1) sz >= cq.Len(), (2) sz is a power | |
// of 2, (3) sz <= cq.maxSz | |
func (cq *cQT) resize(sz uint32) { | |
b := make([]interface{}, 0, sz) | |
si, ei := cq.s&cq.m, cq.e&cq.m | |
if si < ei { | |
b = append(b, cq.b[si:ei]...) | |
} else { | |
b = append(b, cq.b[si:]...) | |
b = append(b, cq.b[:ei]...) | |
} | |
cq.b = b[:sz] | |
cq.s, cq.e = 0, cq.e-cq.s | |
cq.sz = sz | |
cq.m = sz - 1 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment