Skip to content

Instantly share code, notes, and snippets.

@BruceChen7
Last active August 17, 2022 14:13
Show Gist options
  • Save BruceChen7/a710a21e6d6c1cd4c43aa0123bac98e3 to your computer and use it in GitHub Desktop.
Save BruceChen7/a710a21e6d6c1cd4c43aa0123bac98e3 to your computer and use it in GitHub Desktop.
[#dict#epoll] #epoll #python #dict#namespace #docker
import select
import socket

if __name__ == "__main__":
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    s.setblocking(False)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    s.bind(('', 4000))
    s.listen(64)
    ep = select.epoll()
    ep.register(s.fileno(), select.EPOLLIN)
    connections = {}

    while True:
        events = ep.poll(1)
        for fd, event in events:
            if fd == s.fileno() and event == select.EPOLLIN:
                conn, addr = s.accept()
                print "connection ", conn, "is connected"
                conn.setblocking(False)
                ep.register(conn.fileno(), select.EPOLLIN)
                connections[conn.fileno()] = conn
                continue
            if event == select.EPOLLIN:
                assert connections.get(fd) != None
                c = connections.get(fd)
                # recevie 1 byte from the tcp connection on purpose
                # it's to validate the idea of lt mode reading
                msg = c.recv(1)
                print msg

如何创建一个字典和给字典赋值

创建一个字典,和给字典赋值都是比较简单的,按照下面,即可创建。

>>> dict1 = {}
>>> dict2 = {'name': 'earth','port': 80}

当然,我们也可以使用工厂方法来创建

>>> fdict = dict((['x',1],['y',2]))
>>>  fdict
{'y':2,'x':1}
>>> fdict = dict(name= "hello",sex ="male")
{"name":"hello","sex":"male"}

访问字典中的值

要遍历一个字典,使用in操作,然后通过字典键加上括号来访问其值

>>> dict2 = {'name': 'earth','port': 80}
>>> for key in dict2
>>>       print 'key= %s,value = %s' %(key,dict2[key])

如果访问一个不存在的键将会报错。

如何更新字典

可以有几种方式对一个字典做修改:添加一个数据项或者是新元素,修改一个已经存在的数据项,删除一个已经存在的数据项。

>>>dict2['name'] = 'venus'
>>>dict2['port'] = 6969

如果字典中该键已经存在,则字典中该键对应的值将被新值替代.

删除字典某元素和字典

通常删除整个字典的操作是不常见的,下面记录了删除的代码:

del dict2['name']#删除name的条目
dict2.clear()  #删除整个条目

字典方法文档搜索

在python字典中输入:dict.xxx

字典方法

构造字典对象

字典构造

字典方法

  • len(dict)获取字典项目
  • clear() 清除所有的项
  • copy() 浅拷贝复制
  • fromkeys(seq,[,value]) 创建一个新的字典,键来自seq,值设置为value。
  • get(key[,default])获取某个键的值,如果没有该键,默认返回None,当然你可以改变默认值
  • items()将所有的字典项以列表的方式返回,其中每一项都是来自于键和值。(这个键和值共同组成元祖)但在返回的时候, 没有特殊顺序。
  • iteritems()的作用相同,但是返回的是迭代器对象而不是列表。
  • keys() 将字典中的键以列表的方式返回。
  • pop(key[,default]) 如果键值存在,那么直接删除该键-值对,并且返回其值,如果default没有给,并且键不存在,那么会产生KeyError

设置key字典的pattern

my_dict.setdefault(key, []).append(new_value)

# 等价于
if key not in my_dict:
    my_dict[key] = []
my_dict[key].append(new_value)

string

def remove_prefix(text, prefix):
    if text.startswith(prefix):
        return text[len(prefix):]
    return text

系统工具简单脚本

tcpdump or iptables or new net namespace

#  -*-coding:utf8 -*-
import atexit
import ctypes
import io
import os
import shlex
import signal
import socket
import subprocess
import sys
import tcp_info
import time


LIBC = ctypes.CDLL("libc.so.6")

CLONE_NEWNET = 0x40000000
original_net_ns = open("/proc/self/ns/net", 'rb')
if True:
    r = LIBC.unshare(CLONE_NEWNET)
    if r != 0:
        print("[!] Are you root? Need unshare() syscall.")
        sys.exit(-1)
    LIBC.setns(original_net_ns.fileno(), CLONE_NEWNET)

def new_ns():
    # 在一个新的名字中运行程序
    r = LIBC.unshare(CLONE_NEWNET)
    if r != 0:
        print("[!] Are you root? Need unshare() syscall.")
        sys.exit(-1)
    # Execute the command (a string) in a subshell.
    os.system("ip link set lo up")

def restore_ns():
    LIBC.setns(original_net_ns.fileno(), CLONE_NEWNET)


def do_iptables(action, sport, dport, extra):
    if sport:
        sport = '--sport %d' % (sport,)
        dport = ''
    else:
        sport = ''
        dport = '--dport %d' % (dport,)
    # 使用ip table
    os.system("iptables -%s INPUT -i lo -p tcp %s %s %s -j DROP" % (action, sport, dport, extra))

def drop_start(sport=None, dport=None, extra=''):
    do_iptables('I', sport, dport, extra)

def drop_stop(sport=None, dport=None, extra=''):
    do_iptables('D', sport, dport, extra)

# 执行一个shell的命令
tcpdump_bin = os.popen('which tcpdump').read().strip()
# 执行一个shell命令
ss_bin = os.popen('which ss').read().strip()

def tcpdump_start(port):
    # --packet-buffered 打印可打印字符
    # -n 不要主机地址转换成名字
    # Print a delta (micro-second resolution) between current and first line on each dump line
    p = subprocess.Popen(shlex.split('%s -B 16384 --packet-buffered -n -ttttt -i lo port %s' % (tcpdump_bin, port)))
    time.sleep(1)
    # 注册关掉回调
    def close():
        p.send_signal(signal.SIGINT)
        p.wait()
    p.close = close
    atexit.register(close)
    # 创建一个子进程
    return p

def ss(port):
    print(os.popen('%s -t -n -o -a dport = :%s or sport = :%s' % (ss_bin, port, port)).read())

def check_buffer(c):
    ti = tcp_info.from_socket(c)
    print("delivered, acked", ti.tcpi_bytes_acked-1)
    print("in-flight:", ti.tcpi_bytes_sent - ti.tcpi_bytes_retrans- ti.tcpi_bytes_acked+1)
    print("in queue, not in flight:", ti.tcpi_notsent_bytes)

def socket_info(c):
    ti = tcp_info.from_socket(c)
    acked = ti.tcpi_bytes_acked-1
    in_flight = ti.tcpi_bytes_sent - ti.tcpi_bytes_retrans- ti.tcpi_bytes_acked+1
    notsent = ti.tcpi_notsent_bytes
    return acked, in_flight, notsent
@BruceChen7
Copy link
Author

BruceChen7 commented Jul 10, 2020

资料

非阻塞的模式

image

⾮阻塞过程要复杂很多:

  • ⼀次完整的 IO 会话过程会被分割成多次的 IO 过程;
  • 每次 IO 过程需要缓存部分数据及当前会话的处理状态;
  • 要求解析器(如:Json/Xml/Mime 解析器)最好能⽀持流式解析⽅式,否则就需要读到完整数据后才能交给解析器去处理,当遇到业数据较⼤时就需要分配较⼤的连续内存块,必然会造成系统的内存分配压⼒;
  • 当前⼤部分后台系统(如数据库、存储系统、缓存系统)所提供的客户端驱动都是阻塞式的,⽆法直接应⽤在⾮阻塞通信应⽤中,从⽽限制了⾮阻塞通信⽅式的应⽤范围;
  • 多次 IO 过程将应⽤的业务处理逻辑分割的⽀离破碎,⼤⼤增加了业务编写过程的复杂度,降低了开发效率,同时加⼤了后期的不易维护性

网络编程和协程

image

当有⼤量的协程需要运⾏时,在『环形切换』模式下,前⼀个协程运⾏完毕后直接『唤醒』并切换⾄下⼀个协程,⽽⽆需象『星形切换』那样先切换⾄调度原点,再从调度原点来『唤醒』下⼀个协程;因『环形切换』⽐『星形切换』节省了⼀次上下⽂的切换过程,所以『环形切换』⽅式的切换效率更⾼。

image

在网络协程库中,内部有一个缺省的IO调度协程,其负责处理与网络IO相关的协程调度过程,故称之为IO调度协程

  • 每⼀个⽹络连接绑定⼀个套接字句柄,该套接字绑定⼀个协程;

  • 当对⽹络套接字进⾏读或写发生阻塞时,将该套接字添加⾄ IO 调度协程的事件引擎中并设置读写事件,然后将该协程挂起;这样所有处于读写等待状态的⽹络协程都被挂起,且与之关联的⽹络套接字均由 IO 调度协程的事件引擎统⼀监控管理;

  • 当某些⽹络套接字满⾜可读或可写条件时,IO 调度协程的事件引擎返回这些套接字的状态,IO 调度协程找到与这些套接字绑定的协程对象,然后将这些协程追加至协程调度队列中,使其依次运⾏;

IO 事件协程内部本身是由系统事件引擎(如:Linux 下的 epoll 事件引擎)驱动的,其内部 IO 事件的驱动机制和上⾯介绍的⾮阻塞过程相似,当某个套接字句柄『准备就绪』时,IO 调度协程便将其所绑定的协程添加进协程调度队列中,待本次 IO 调度协程返回后,会依次运⾏协程调度队列⾥的所有协程

@BruceChen7
Copy link
Author

BruceChen7 commented May 31, 2022

资料来源

file-max是设置系统所有进程一共可以打开的文件数量。同时程序也可以通过setrlimit调用设置每个进程的限制。

echo 2000500 > /proc/sys/fs/file-max或者 sysctl -w "fs.file-max=2000500"可以实时更改这个参数,但是重启之后会恢复为默认值。
也可以修改/etc/sysctl.conf, 加入fs.file-max = 2000500重启或者sysctl -w生效。

设置资源限制。首先修改/proc/sys/fs/nr_open,然后再用ulimit进行修改:

echo 2000500 > /proc/sys/fs/nr_open
ulimit -n 2000500

当然如果你想重启以后也会使用这些参数,你需要修改/etc/sysctl.conf中的fs.nr_open参数和/etc/security/limits.conf的参数:

# vi /etc/security/limits.conf

* soft nofile 2000500 
* hard nofile 2000500

如果你开启了iptables,iptalbes会使用nf_conntrack模块跟踪连接,而这个连接跟踪的数量是有最大值的,当跟踪的连接超过这个最大值,就会导致连接失败。 通过命令查看查看最大值

# cat /proc/sys/net/nf_conntrack_max
 1024000

对于我们的测试来说,为了我们的测试方便,可能需要一些网络协议栈的调优,可以根据个人的情况进行设置

sysctl -w fs.file-max=2000500
sysctl -w fs.nr_open=2000500
sysctl -w net.nf_conntrack_max=2000500
ulimit -n 2000500
sysctl -w net.ipv4.tcp_mem='131072  262144  524288'
sysctl -w net.ipv4.tcp_rmem='8760  256960  4088000'
sysctl -w net.ipv4.tcp_wmem='8760  256960  4088000'
sysctl -w net.core.rmem_max=16384
sysctl -w net.core.wmem_max=16384
sysctl -w net.core.somaxconn=2048
sysctl -w net.ipv4.tcp_max_syn_backlog=2048
sysctl -w /proc/sys/net/core/netdev_max_backlog=2048
sysctl -w net.ipv4.tcp_tw_recycle=1
sysctl -w net.ipv4.tcp_tw_reuse=1

@BruceChen7
Copy link
Author

BruceChen7 commented May 31, 2022

资料来源

goroutine per connection

// server.go
func main() {
	ln, err := net.Listen("tcp", ":8972")
	if err != nil {
		panic(err)
	}
	go func() {
		if err := http.ListenAndServe(":6060", nil); err != nil {
			log.Fatalf("pprof failed: %v", err)
		}
	}()
	var connections []net.Conn
	defer func() {
		for _, conn := range connections {
			conn.Close()
		}
	}()
	for {
		conn, e := ln.Accept()
		if e != nil {
			if ne, ok := e.(net.Error); ok && ne.Temporary() {
				log.Printf("accept temp err: %v", ne)
				continue
			}
			log.Printf("accept err: %v", e)
			return
		}
		go handleConn(conn)
		connections = append(connections, conn)
		if len(connections)%100 == 0 {
			log.Printf("total number of connections: %v", len(connections))
		}
	}
}
func handleConn(conn net.Conn) {
	io.Copy(ioutil.Discard, conn)
}
// client.go
var (
	ip          = flag.String("ip", "127.0.0.1", "server IP")
	connections = flag.Int("conn", 1, "number of tcp connections")
)
func main() {
	flag.Parse()
	addr := *ip + ":8972"
	log.Printf("连接到 %s", addr)
	var conns []net.Conn
	for i := 0; i < *connections; i++ {
		c, err := net.DialTimeout("tcp", addr, 10*time.Second)
		if err != nil {
			fmt.Println("failed to connect", i, err)
			i--
			continue
		}
		conns = append(conns, c)
		time.Sleep(time.Millisecond)
	}
	defer func() {
		for _, c := range conns {
			c.Close()
		}
	}()
	log.Printf("完成初始化 %d 连接", len(conns))
	tts := time.Second
	if *connections > 100 {
		tts = time.Millisecond * 5
	}
	for {
		for i := 0; i < len(conns); i++ {
			time.Sleep(tts)
			conn := conns[i]
			conn.Write([]byte("hello world\r\n"))
		}
	}
}

因为从一个IP连接到同一个服务器的某个端口最多也只能建立65535个连接,所以直接运行客户端没办法建立百万的连接。 Eran Yanay采用docker的方法确实让人眼前一亮(我以前都是通过手工设置多个ip的方式实现,采用docker的方式更简单)。

我们使用50个docker容器做客户端,每个建立2万个连接,总共建立一百万的连接。

./setup.sh 20000 50 172.17.0.1

setup.sh内容如下,使用几M大小的alpine docker镜像跑测试:

#!/bin/bash address, 缺省是 172.17.0.1
CONNECTIONS=$1
REPLICAS=$2
IP=$3
#go build --tags "static netgo" -o client client.go
for (( c=0; c<${REPLICAS}; c++ ))
do
    # 以这种方式运行
    docker run -v $(pwd)/client:/client --name 1mclient_$c -d alpine /client \
    -conn=${CONNECTIONS} -ip=${IP}
done

@BruceChen7
Copy link
Author

BruceChen7 commented Jun 6, 2022

使用epoll的方式

这里需要hack的方式,将net库中epoll暴露出来

var epoller *epoll
func main() {
	setLimit()
	ln, err := net.Listen("tcp", ":8972")
	if err != nil {
		panic(err)
	}
	go func() {
		if err := http.ListenAndServe(":6060", nil); err != nil {
			log.Fatalf("pprof failed: %v", err)
		}
	}()
	epoller, err = MkEpoll()
	if err != nil {
		panic(err)
	}
	go start()
	for {
		conn, e := ln.Accept()
		if e != nil {
			if ne, ok := e.(net.Error); ok && ne.Temporary() {
				log.Printf("accept temp err: %v", ne)
				continue
			}
			log.Printf("accept err: %v", e)
			return
		}
		if err := epoller.Add(conn); err != nil {
			log.Printf("failed to add connection %v", err)
			conn.Close()
		}
	}
}
func start() {
	var buf = make([]byte, 8)
	for {
		connections, err := epoller.Wait()
		if err != nil {
			log.Printf("failed to epoll wait %v", err)
			continue
		}
		for _, conn := range connections {
			if conn == nil {
				break
			}
			if _, err := conn.Read(buf); err != nil {
				if err := epoller.Remove(conn); err != nil {
					log.Printf("failed to remove %v", err)
				}
				conn.Close()
			}
		}
	}
}

epoll的抽象

type epoll struct {
	fd          int
	connections map[int]net.Conn
	lock        *sync.RWMutex
}
func MkEpoll() (*epoll, error) {
	fd, err := unix.EpollCreate1(0)
	if err != nil {
		return nil, err
	}
	return &epoll{
		fd:          fd,
		lock:        &sync.RWMutex{},
		connections: make(map[int]net.Conn),
	}, nil
}
func (e *epoll) Add(conn net.Conn) error {
	// Extract file descriptor associated with the connection
	fd := socketFD(conn)
	err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_ADD, fd, &unix.EpollEvent{Events: unix.POLLIN | unix.POLLHUP, Fd: int32(fd)})
	if err != nil {
		return err
	}
	e.lock.Lock()
	defer e.lock.Unlock()
	e.connections[fd] = conn
	if len(e.connections)%100 == 0 {
		log.Printf("total number of connections: %v", len(e.connections))
	}
	return nil
}
func (e *epoll) Remove(conn net.Conn) error {
	fd := socketFD(conn)
	err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_DEL, fd, nil)
	if err != nil {
		return err
	}
	e.lock.Lock()
	defer e.lock.Unlock()
	delete(e.connections, fd)
	if len(e.connections)%100 == 0 {
		log.Printf("total number of connections: %v", len(e.connections))
	}
	return nil
}
func (e *epoll) Wait() ([]net.Conn, error) {
	events := make([]unix.EpollEvent, 100)
	n, err := unix.EpollWait(e.fd, events, 100)
	if err != nil {
		return nil, err
	}
	e.lock.RLock()
	defer e.lock.RUnlock()
	var connections []net.Conn
	for i := 0; i < n; i++ {
		conn := e.connections[int(events[i].Fd)]
		connections = append(connections, conn)
	}
	return connections, nil
}
func socketFD(conn net.Conn) int {
	//tls := reflect.TypeOf(conn.UnderlyingConn()) == reflect.TypeOf(&tls.Conn{})
	// Extract the file descriptor associated with the connection
	//connVal := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn").Elem()
	tcpConn := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn")
	//if tls {
	//	tcpConn = reflect.Indirect(tcpConn.Elem())
	//}
	fdVal := tcpConn.FieldByName("fd")
	pfdVal := reflect.Indirect(fdVal).FieldByName("pfd")
	return int(pfdVal.FieldByName("Sysfd").Int())
}

可以看到建立连接后大约占了10G的内存,CPU占用非常小。

@BruceChen7
Copy link
Author

Raw socket

这里我们使用gopacket对各种协议层的包的定义,方便解析(或创建)TCP/IP各层的网络协议。

package main
import (
	"flag"
	"log"
	"net"
	"github.com/google/gopacket"
	"github.com/google/gopacket/layers"
	"github.com/smallnest/go-network-programming/codec"
	"golang.org/x/net/bpf"
	"golang.org/x/net/ipv4"
)
var (
	addr = flag.String("s", "localhost", "server address")
	port = flag.Int("p", 8972, "port")
)
var (
	stat         = make(map[string]int)
	lastStatTime = int64(0)
)
func main() {
	flag.Parse()
	conn, err := net.ListenPacket("ip4:udp", *addr)
	if err != nil {
		panic(err)
	}
	cc := conn.(*net.IPConn)
	cc.SetReadBuffer(20 * 1024 * 1024)
	cc.SetWriteBuffer(20 * 1024 * 1024)
	handleConn(conn)
}
func handleConn(conn net.PacketConn) {
	for {
		buffer := make([]byte, 1024)
		n, remoteaddr, err := conn.ReadFrom(buffer)
		if err != nil {
			log.Fatal(err)
		}
		buffer = buffer[:n]
		packet := gopacket.NewPacket(buffer, layers.LayerTypeUDP, gopacket.NoCopy)
		// Get the UDP layer from this packet
		if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil {
			udp, _ := udpLayer.(*layers.UDP)
			if app := packet.ApplicationLayer(); app != nil {
				data, err := codec.EncodeUDPPacket(net.ParseIP("127.0.0.1"), net.ParseIP("127.0.0.1"), uint16(udp.DstPort), uint16(udp.SrcPort), app.Payload())
				if err != nil {
					log.Printf("failed to EncodePacket: %v", err)
					return
				}
				if _, err := conn.WriteTo(data, remoteaddr); err != nil {
					log.Printf("failed to write packet: %v", err)
					conn.Close()
					return
				}
			}
		}
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment