import select
import socket
if __name__ == "__main__":
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', 4000))
s.listen(64)
ep = select.epoll()
ep.register(s.fileno(), select.EPOLLIN)
connections = {}
while True:
events = ep.poll(1)
for fd, event in events:
if fd == s.fileno() and event == select.EPOLLIN:
conn, addr = s.accept()
print "connection ", conn, "is connected"
conn.setblocking(False)
ep.register(conn.fileno(), select.EPOLLIN)
connections[conn.fileno()] = conn
continue
if event == select.EPOLLIN:
assert connections.get(fd) != None
c = connections.get(fd)
# recevie 1 byte from the tcp connection on purpose
# it's to validate the idea of lt mode reading
msg = c.recv(1)
print msg
Last active
August 17, 2022 14:13
-
-
Save BruceChen7/a710a21e6d6c1cd4c43aa0123bac98e3 to your computer and use it in GitHub Desktop.
[#dict#epoll] #epoll #python #dict#namespace #docker
创建一个字典,和给字典赋值都是比较简单的,按照下面,即可创建。
>>> dict1 = {}
>>> dict2 = {'name': 'earth','port': 80}
当然,我们也可以使用工厂方法来创建
>>> fdict = dict((['x',1],['y',2]))
>>> fdict
{'y':2,'x':1}
>>> fdict = dict(name= "hello",sex ="male")
{"name":"hello","sex":"male"}
要遍历一个字典,使用in
操作,然后通过字典键加上括号来访问其值
>>> dict2 = {'name': 'earth','port': 80}
>>> for key in dict2
>>> print 'key= %s,value = %s' %(key,dict2[key])
如果访问一个不存在的键将会报错。
可以有几种方式对一个字典做修改:添加一个数据项或者是新元素,修改一个已经存在的数据项,删除一个已经存在的数据项。
>>>dict2['name'] = 'venus'
>>>dict2['port'] = 6969
如果字典中该键已经存在,则字典中该键对应的值将被新值替代.
通常删除整个字典的操作是不常见的,下面记录了删除的代码:
del dict2['name']#删除name的条目
dict2.clear() #删除整个条目
在python字典中输入:dict.xxx
。
构造字典对象
字典构造
- len(dict)获取字典项目
- clear() 清除所有的项
- copy() 浅拷贝复制
- fromkeys(seq,[,value]) 创建一个新的字典,键来自seq,值设置为value。
- get(key[,default])获取某个键的值,如果没有该键,默认返回None,当然你可以改变默认值
- items()将所有的字典项以列表的方式返回,其中每一项都是来自于键和值。(这个键和值共同组成元祖)但在返回的时候, 没有特殊顺序。
- iteritems()的作用相同,但是返回的是迭代器对象而不是列表。
- keys() 将字典中的键以列表的方式返回。
- pop(key[,default]) 如果键值存在,那么直接删除该键-值对,并且返回其值,如果default没有给,并且键不存在,那么会产生KeyError
设置key字典的pattern
my_dict.setdefault(key, []).append(new_value)
# 等价于
if key not in my_dict:
my_dict[key] = []
my_dict[key].append(new_value)
def remove_prefix(text, prefix):
if text.startswith(prefix):
return text[len(prefix):]
return text
tcpdump or iptables or new net namespace
# -*-coding:utf8 -*-
import atexit
import ctypes
import io
import os
import shlex
import signal
import socket
import subprocess
import sys
import tcp_info
import time
LIBC = ctypes.CDLL("libc.so.6")
CLONE_NEWNET = 0x40000000
original_net_ns = open("/proc/self/ns/net", 'rb')
if True:
r = LIBC.unshare(CLONE_NEWNET)
if r != 0:
print("[!] Are you root? Need unshare() syscall.")
sys.exit(-1)
LIBC.setns(original_net_ns.fileno(), CLONE_NEWNET)
def new_ns():
# 在一个新的名字中运行程序
r = LIBC.unshare(CLONE_NEWNET)
if r != 0:
print("[!] Are you root? Need unshare() syscall.")
sys.exit(-1)
# Execute the command (a string) in a subshell.
os.system("ip link set lo up")
def restore_ns():
LIBC.setns(original_net_ns.fileno(), CLONE_NEWNET)
def do_iptables(action, sport, dport, extra):
if sport:
sport = '--sport %d' % (sport,)
dport = ''
else:
sport = ''
dport = '--dport %d' % (dport,)
# 使用ip table
os.system("iptables -%s INPUT -i lo -p tcp %s %s %s -j DROP" % (action, sport, dport, extra))
def drop_start(sport=None, dport=None, extra=''):
do_iptables('I', sport, dport, extra)
def drop_stop(sport=None, dport=None, extra=''):
do_iptables('D', sport, dport, extra)
# 执行一个shell的命令
tcpdump_bin = os.popen('which tcpdump').read().strip()
# 执行一个shell命令
ss_bin = os.popen('which ss').read().strip()
def tcpdump_start(port):
# --packet-buffered 打印可打印字符
# -n 不要主机地址转换成名字
# Print a delta (micro-second resolution) between current and first line on each dump line
p = subprocess.Popen(shlex.split('%s -B 16384 --packet-buffered -n -ttttt -i lo port %s' % (tcpdump_bin, port)))
time.sleep(1)
# 注册关掉回调
def close():
p.send_signal(signal.SIGINT)
p.wait()
p.close = close
atexit.register(close)
# 创建一个子进程
return p
def ss(port):
print(os.popen('%s -t -n -o -a dport = :%s or sport = :%s' % (ss_bin, port, port)).read())
def check_buffer(c):
ti = tcp_info.from_socket(c)
print("delivered, acked", ti.tcpi_bytes_acked-1)
print("in-flight:", ti.tcpi_bytes_sent - ti.tcpi_bytes_retrans- ti.tcpi_bytes_acked+1)
print("in queue, not in flight:", ti.tcpi_notsent_bytes)
def socket_info(c):
ti = tcp_info.from_socket(c)
acked = ti.tcpi_bytes_acked-1
in_flight = ti.tcpi_bytes_sent - ti.tcpi_bytes_retrans- ti.tcpi_bytes_acked+1
notsent = ti.tcpi_notsent_bytes
return acked, in_flight, notsent
使用epoll的方式
这里需要hack的方式,将net库中epoll暴露出来
var epoller *epoll
func main() {
setLimit()
ln, err := net.Listen("tcp", ":8972")
if err != nil {
panic(err)
}
go func() {
if err := http.ListenAndServe(":6060", nil); err != nil {
log.Fatalf("pprof failed: %v", err)
}
}()
epoller, err = MkEpoll()
if err != nil {
panic(err)
}
go start()
for {
conn, e := ln.Accept()
if e != nil {
if ne, ok := e.(net.Error); ok && ne.Temporary() {
log.Printf("accept temp err: %v", ne)
continue
}
log.Printf("accept err: %v", e)
return
}
if err := epoller.Add(conn); err != nil {
log.Printf("failed to add connection %v", err)
conn.Close()
}
}
}
func start() {
var buf = make([]byte, 8)
for {
connections, err := epoller.Wait()
if err != nil {
log.Printf("failed to epoll wait %v", err)
continue
}
for _, conn := range connections {
if conn == nil {
break
}
if _, err := conn.Read(buf); err != nil {
if err := epoller.Remove(conn); err != nil {
log.Printf("failed to remove %v", err)
}
conn.Close()
}
}
}
}
epoll的抽象
type epoll struct {
fd int
connections map[int]net.Conn
lock *sync.RWMutex
}
func MkEpoll() (*epoll, error) {
fd, err := unix.EpollCreate1(0)
if err != nil {
return nil, err
}
return &epoll{
fd: fd,
lock: &sync.RWMutex{},
connections: make(map[int]net.Conn),
}, nil
}
func (e *epoll) Add(conn net.Conn) error {
// Extract file descriptor associated with the connection
fd := socketFD(conn)
err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_ADD, fd, &unix.EpollEvent{Events: unix.POLLIN | unix.POLLHUP, Fd: int32(fd)})
if err != nil {
return err
}
e.lock.Lock()
defer e.lock.Unlock()
e.connections[fd] = conn
if len(e.connections)%100 == 0 {
log.Printf("total number of connections: %v", len(e.connections))
}
return nil
}
func (e *epoll) Remove(conn net.Conn) error {
fd := socketFD(conn)
err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_DEL, fd, nil)
if err != nil {
return err
}
e.lock.Lock()
defer e.lock.Unlock()
delete(e.connections, fd)
if len(e.connections)%100 == 0 {
log.Printf("total number of connections: %v", len(e.connections))
}
return nil
}
func (e *epoll) Wait() ([]net.Conn, error) {
events := make([]unix.EpollEvent, 100)
n, err := unix.EpollWait(e.fd, events, 100)
if err != nil {
return nil, err
}
e.lock.RLock()
defer e.lock.RUnlock()
var connections []net.Conn
for i := 0; i < n; i++ {
conn := e.connections[int(events[i].Fd)]
connections = append(connections, conn)
}
return connections, nil
}
func socketFD(conn net.Conn) int {
//tls := reflect.TypeOf(conn.UnderlyingConn()) == reflect.TypeOf(&tls.Conn{})
// Extract the file descriptor associated with the connection
//connVal := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn").Elem()
tcpConn := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn")
//if tls {
// tcpConn = reflect.Indirect(tcpConn.Elem())
//}
fdVal := tcpConn.FieldByName("fd")
pfdVal := reflect.Indirect(fdVal).FieldByName("pfd")
return int(pfdVal.FieldByName("Sysfd").Int())
}
可以看到建立连接后大约占了10G的内存,CPU占用非常小。
Raw socket
这里我们使用gopacket对各种协议层的包的定义,方便解析(或创建)TCP/IP各层的网络协议。
package main
import (
"flag"
"log"
"net"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/smallnest/go-network-programming/codec"
"golang.org/x/net/bpf"
"golang.org/x/net/ipv4"
)
var (
addr = flag.String("s", "localhost", "server address")
port = flag.Int("p", 8972, "port")
)
var (
stat = make(map[string]int)
lastStatTime = int64(0)
)
func main() {
flag.Parse()
conn, err := net.ListenPacket("ip4:udp", *addr)
if err != nil {
panic(err)
}
cc := conn.(*net.IPConn)
cc.SetReadBuffer(20 * 1024 * 1024)
cc.SetWriteBuffer(20 * 1024 * 1024)
handleConn(conn)
}
func handleConn(conn net.PacketConn) {
for {
buffer := make([]byte, 1024)
n, remoteaddr, err := conn.ReadFrom(buffer)
if err != nil {
log.Fatal(err)
}
buffer = buffer[:n]
packet := gopacket.NewPacket(buffer, layers.LayerTypeUDP, gopacket.NoCopy)
// Get the UDP layer from this packet
if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil {
udp, _ := udpLayer.(*layers.UDP)
if app := packet.ApplicationLayer(); app != nil {
data, err := codec.EncodeUDPPacket(net.ParseIP("127.0.0.1"), net.ParseIP("127.0.0.1"), uint16(udp.DstPort), uint16(udp.SrcPort), app.Payload())
if err != nil {
log.Printf("failed to EncodePacket: %v", err)
return
}
if _, err := conn.WriteTo(data, remoteaddr); err != nil {
log.Printf("failed to write packet: %v", err)
conn.Close()
return
}
}
}
}
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
资料来源
goroutine per connection
因为从一个IP连接到同一个服务器的某个端口最多也只能建立65535个连接,所以直接运行客户端没办法建立百万的连接。 Eran Yanay采用docker的方法确实让人眼前一亮(我以前都是通过手工设置多个ip的方式实现,采用docker的方式更简单)。
我们使用50个docker容器做客户端,每个建立2万个连接,总共建立一百万的连接。
setup.sh内容如下,使用几M大小的alpine docker镜像跑测试: