Created
November 11, 2015 17:12
-
-
Save askldjd/b75b6b6b8382a95aa6da to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket, struct, os, array | |
from scapy.all import ETH_P_ALL | |
from scapy.all import select | |
from scapy.all import MTU | |
class IPSniff: | |
def __init__(self, interface_name, on_ip_incoming, on_ip_outgoing): | |
self.interface_name = interface_name | |
self.on_ip_incoming = on_ip_incoming | |
self.on_ip_outgoing = on_ip_outgoing | |
# The raw in (listen) socket is a L2 raw socket that listens | |
# for all packets going through a specific interface. | |
self.ins = socket.socket( | |
socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_ALL)) | |
self.ins.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 2**30) | |
self.ins.bind((self.interface_name, ETH_P_ALL)) | |
def __process_ipframe(self, pkt_type, ip_header, payload): | |
# Extract the 20 bytes IP header, ignoring the IP options | |
fields = struct.unpack("!BBHHHBBHII", ip_header) | |
dummy_hdrlen = fields[0] & 0xf | |
iplen = fields[2] | |
ip_src = payload[12:16] | |
ip_dst = payload[16:20] | |
ip_frame = payload[0:iplen] | |
if pkt_type == socket.PACKET_OUTGOING: | |
if self.on_ip_outgoing is not None: | |
self.on_ip_outgoing(ip_src, ip_dst, ip_frame) | |
else: | |
if self.on_ip_incoming is not None: | |
self.on_ip_incoming(ip_src, ip_dst, ip_frame) | |
def recv(self): | |
while True: | |
pkt, sa_ll = self.ins.recvfrom(MTU) | |
if type == socket.PACKET_OUTGOING and self.on_ip_outgoing is None: | |
continue | |
elif self.on_ip_outgoing is None: | |
continue | |
if len(pkt) <= 0: | |
break | |
eth_header = struct.unpack("!6s6sH", pkt[0:14]) | |
dummy_eth_protocol = socket.ntohs(eth_header[2]) | |
if eth_header[2] != 0x800 : | |
continue | |
ip_header = pkt[14:34] | |
payload = pkt[14:] | |
self.__process_ipframe(sa_ll[2], ip_header, payload) | |
#Example code to use IPSniff | |
def test_incoming_callback(src, dst, frame): | |
#pass | |
print("incoming - src=%s, dst=%s, frame len = %d" | |
%(socket.inet_ntoa(src), socket.inet_ntoa(dst), len(frame))) | |
def test_outgoing_callback(src, dst, frame): | |
#pass | |
print("outgoing - src=%s, dst=%s, frame len = %d" | |
%(socket.inet_ntoa(src), socket.inet_ntoa(dst), len(frame))) | |
ip_sniff = IPSniff('eth3', test_incoming_callback, test_outgoing_callback) | |
ip_sniff.recv() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment