Created
August 22, 2016 23:07
-
-
Save nemosupremo/476ae25d894128632fcd7f1d7ee2161d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sarama | |
import ( | |
"bufio" | |
"net" | |
"sort" | |
) | |
type none struct{} | |
// make []int32 sortable so we can sort partition numbers | |
type int32Slice []int32 | |
func (slice int32Slice) Len() int { | |
return len(slice) | |
} | |
func (slice int32Slice) Less(i, j int) bool { | |
return slice[i] < slice[j] | |
} | |
func (slice int32Slice) Swap(i, j int) { | |
slice[i], slice[j] = slice[j], slice[i] | |
} | |
func dupeAndSort(input []int32) []int32 { | |
ret := make([]int32, 0, len(input)) | |
for _, val := range input { | |
ret = append(ret, val) | |
} | |
sort.Sort(int32Slice(ret)) | |
return ret | |
} | |
func withRecover(fn func()) { | |
defer func() { | |
handler := PanicHandler | |
if handler != nil { | |
if err := recover(); err != nil { | |
handler(err) | |
} | |
} | |
}() | |
fn() | |
} | |
func safeAsyncClose(b *Broker) { | |
tmp := b // local var prevents clobbering in goroutine | |
go withRecover(func() { | |
if connected, _ := tmp.Connected(); connected { | |
if err := tmp.Close(); err != nil { | |
Logger.Println("Error closing broker", tmp.ID(), ":", err) | |
} | |
} | |
}) | |
} | |
// Encoder is a simple interface for any type that can be encoded as an array of bytes | |
// in order to be sent as the key or value of a Kafka message. Length() is provided as an | |
// optimization, and must return the same as len() on the result of Encode(). | |
type Encoder interface { | |
Encode() ([]byte, error) | |
Length() int | |
} | |
// make strings and byte slices encodable for convenience so they can be used as keys | |
// and/or values in kafka messages | |
// StringEncoder implements the Encoder interface for Go strings so that they can be used | |
// as the Key or Value in a ProducerMessage. | |
type StringEncoder string | |
func (s StringEncoder) Encode() ([]byte, error) { | |
return []byte(s), nil | |
} | |
func (s StringEncoder) Length() int { | |
return len(s) | |
} | |
// ByteEncoder implements the Encoder interface for Go byte slices so that they can be used | |
// as the Key or Value in a ProducerMessage. | |
type ByteEncoder []byte | |
func (b ByteEncoder) Encode() ([]byte, error) { | |
return b, nil | |
} | |
func (b ByteEncoder) Length() int { | |
return len(b) | |
} | |
// bufConn wraps a net.Conn with a buffer for reads to reduce the number of | |
// reads that trigger syscalls. | |
type bufConn struct { | |
net.Conn | |
buf *bufio.Reader | |
} | |
func newBufConn(conn net.Conn) *bufConn { | |
return &bufConn{ | |
Conn: conn, | |
buf: bufio.NewReaderSize(conn, 32768), | |
} | |
} | |
func (bc *bufConn) Read(b []byte) (n int, err error) { | |
return bc.buf.Read(b) | |
} | |
// KafkaVersion instances represent versions of the upstream Kafka broker. | |
type KafkaVersion struct { | |
// it's a struct rather than just typing the array directly to make it opaque and stop people | |
// generating their own arbitrary versions | |
version [4]uint | |
} | |
func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion { | |
return KafkaVersion{ | |
version: [4]uint{major, minor, veryMinor, patch}, | |
} | |
} | |
// IsAtLeast return true if and only if the version it is called on is | |
// greater than or equal to the version passed in: | |
// V1.IsAtLeast(V2) // false | |
// V2.IsAtLeast(V1) // true | |
func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool { | |
for i := range v.version { | |
if v.version[i] > other.version[i] { | |
return true | |
} else if v.version[i] < other.version[i] { | |
return false | |
} | |
} | |
return true | |
} | |
// Effective constants defining the supported kafka versions. | |
var ( | |
V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) | |
V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) | |
V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) | |
V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) | |
V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) | |
V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) | |
minVersion = V0_8_2_0 | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment