Created
August 3, 2020 05:58
-
-
Save chanjarster/a3b6957ad1f4117038722372cafbecb9 to your computer and use it in GitHub Desktop.
极客时间大爷胡同口优化
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://time.geekbang.org/column/article/126504 | |
// 张大爷在胡同口等着 ... | |
// 碰见一个李大爷:127.0.0.1:59668 | |
// 耗时: 40.354851ms | |
package main | |
import ( | |
"bytes" | |
"encoding/binary" | |
"fmt" | |
"io" | |
"net" | |
"sync" | |
"time" | |
) | |
var zRecvCount = uint32(0) // 张大爷听到了多少句话 | |
var lRecvCount = uint32(0) // 李大爷听到了多少句话 | |
var total = uint32(100000) // 总共需要遇见多少次 | |
var z0 = "吃了没,您吶?" | |
var z3 = "嗨!吃饱了溜溜弯儿。" | |
var z5 = "回头去给老太太请安!" | |
var l1 = "刚吃。" | |
var l2 = "您这,嘛去?" | |
var l4 = "有空家里坐坐啊。" | |
type RequestResponse struct { | |
Serial uint32 // 序号 | |
Payload string // 内容 | |
} | |
var liChan = make(chan *RequestResponse, 10000) // 李大爷channel | |
var zhangChan = make(chan *RequestResponse, 10000) // 张大爷channel | |
func queue(r *RequestResponse, ch chan *RequestResponse) { | |
select { | |
case ch <- r: | |
default: | |
go func() { | |
defer func() { | |
if recover() != nil { | |
// send on closed channel | |
return | |
} | |
}() | |
ch <- r | |
}() | |
} | |
} | |
func consume(conn *net.TCPConn, ch chan *RequestResponse) { | |
// 复用[]byte | |
buf := &bytes.Buffer{} | |
for { | |
// 第一个消息总是阻塞的 | |
r, ok := <-ch | |
if !ok { | |
// channel已经关闭了 | |
return | |
} | |
serialize(r, buf) | |
// 收集一波再发送 | |
for i := 0; i < 49; i++ { | |
select { | |
case r, ok := <-ch: | |
if !ok { | |
// channel closed | |
break | |
} | |
serialize(r, buf) | |
default: | |
// nothing in the channel | |
break | |
} | |
} | |
if buf.Len() > 0 { | |
conn.Write(buf.Bytes()) | |
buf.Reset() | |
} | |
} | |
} | |
// 序列化RequestResponse,并发送 | |
// 序列化后的结构如下: | |
// 长度 4字节 | |
// Serial 4字节 | |
// PayLoad 变长 | |
func serialize(r *RequestResponse, writer io.Writer) { | |
payloadBytes := []byte(r.Payload) | |
serialBytes := make([]byte, 4) | |
binary.BigEndian.PutUint32(serialBytes, r.Serial) | |
length := uint32(len(payloadBytes) + len(serialBytes)) | |
lengthByte := make([]byte, 4) | |
binary.BigEndian.PutUint32(lengthByte, length) | |
writer.Write(lengthByte) | |
writer.Write(serialBytes) | |
writer.Write(payloadBytes) | |
} | |
// 接收数据,反序列化成RequestResponse | |
func readFrom(conn *net.TCPConn) (*RequestResponse, error) { | |
ret := &RequestResponse{} | |
buf := make([]byte, 4) | |
if _, err := io.ReadFull(conn, buf); err != nil { | |
return nil, fmt.Errorf("读长度故障:%s", err.Error()) | |
} | |
length := binary.BigEndian.Uint32(buf) | |
if _, err := io.ReadFull(conn, buf); err != nil { | |
return nil, fmt.Errorf("读Serial故障:%s", err.Error()) | |
} | |
ret.Serial = binary.BigEndian.Uint32(buf) | |
payloadBytes := make([]byte, length-4) | |
if _, err := io.ReadFull(conn, payloadBytes); err != nil { | |
return nil, fmt.Errorf("读Payload故障:%s", err.Error()) | |
} | |
ret.Payload = string(payloadBytes) | |
return ret, nil | |
} | |
// 张大爷的耳朵 | |
func zhangDaYeListen(conn *net.TCPConn, wg *sync.WaitGroup) { | |
defer func() { | |
close(zhangChan) | |
wg.Done() | |
}() | |
for zRecvCount < total*3 { | |
r, err := readFrom(conn) | |
if err != nil { | |
fmt.Println(err.Error()) | |
break | |
} | |
// fmt.Println("张大爷收到:" + r.Payload) | |
if r.Payload == l2 { // 如果收到:您这,嘛去? | |
queue(&RequestResponse{r.Serial, z3}, zhangChan) | |
//go writeTo(&RequestResponse{r.Serial, z3}, conn, &zhangWriteLock) // 回复:嗨!吃饱了溜溜弯儿。 | |
} else if r.Payload == l4 { // 如果收到:有空家里坐坐啊。 | |
queue(&RequestResponse{r.Serial, z5}, zhangChan) | |
//go writeTo(&RequestResponse{r.Serial, z5}, conn, &zhangWriteLock) // 回复:回头去给老太太请安! | |
} else if r.Payload == l1 { // 如果收到:刚吃。 | |
// 不用回复 | |
} else { | |
fmt.Println("张大爷听不懂:" + r.Payload) | |
break | |
} | |
zRecvCount++ | |
} | |
} | |
// 张大爷的嘴 | |
func zhangDaYeSay(conn *net.TCPConn) { | |
nextSerial := uint32(0) | |
for i := uint32(0); i < total; i++ { | |
queue(&RequestResponse{nextSerial, z0}, zhangChan) | |
//writeTo(&RequestResponse{nextSerial, z0}, conn, &zhangWriteLock) | |
nextSerial++ | |
} | |
} | |
// 李大爷的耳朵,实现是和张大爷类似的 | |
func liDaYeListen(conn *net.TCPConn, wg *sync.WaitGroup) { | |
defer func() { | |
close(liChan) | |
wg.Done() | |
}() | |
for lRecvCount < total*3 { | |
r, err := readFrom(conn) | |
if err != nil { | |
fmt.Println(err.Error()) | |
break | |
} | |
// fmt.Println("李大爷收到:" + r.Payload) | |
if r.Payload == z0 { // 如果收到:吃了没,您吶? | |
queue(&RequestResponse{r.Serial, l1}, liChan) | |
//writeTo(&RequestResponse{r.Serial, l1}, conn, &liWriteLock) // 回复:刚吃。 | |
} else if r.Payload == z3 { | |
// do nothing | |
} else if r.Payload == z5 { | |
// do nothing | |
} else { | |
fmt.Println("李大爷听不懂:" + r.Payload) | |
break | |
} | |
lRecvCount++ | |
} | |
} | |
// 李大爷的嘴 | |
func liDaYeSay(conn *net.TCPConn) { | |
nextSerial := uint32(0) | |
for i := uint32(0); i < total; i++ { | |
queue(&RequestResponse{nextSerial, l2}, liChan) | |
//writeTo(&RequestResponse{nextSerial, l2}, conn, &liWriteLock) | |
nextSerial++ | |
queue(&RequestResponse{nextSerial, l4}, liChan) | |
//writeTo(&RequestResponse{nextSerial, l4}, conn, &liWriteLock) | |
nextSerial++ | |
} | |
} | |
func startServer(wg *sync.WaitGroup) { | |
tcpAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:9999") | |
tcpListener, _ := net.ListenTCP("tcp", tcpAddr) | |
defer tcpListener.Close() | |
fmt.Println("张大爷在胡同口等着 ...") | |
for { | |
conn, err := tcpListener.AcceptTCP() | |
if err != nil { | |
fmt.Println(err) | |
break | |
} | |
fmt.Println("碰见一个李大爷:" + conn.RemoteAddr().String()) | |
go zhangDaYeListen(conn, wg) | |
go zhangDaYeSay(conn) | |
consume(conn, zhangChan) | |
} | |
} | |
func startClient(wg *sync.WaitGroup) *net.TCPConn { | |
var tcpAddr *net.TCPAddr | |
tcpAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:9999") | |
conn, _ := net.DialTCP("tcp", nil, tcpAddr) | |
go liDaYeListen(conn, wg) | |
go liDaYeSay(conn) | |
consume(conn, liChan) | |
return conn | |
} | |
func main() { | |
var wg sync.WaitGroup | |
wg.Add(2) | |
go startServer(&wg) | |
time.Sleep(time.Second) | |
conn := startClient(&wg) | |
t1 := time.Now() | |
wg.Wait() | |
elapsed := time.Since(t1) | |
conn.Close() | |
fmt.Println("耗时: ", elapsed) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
666