-
-
Save userbox020/1490f283f88125ad32690fbacc3212e4 to your computer and use it in GitHub Desktop.
simple scapy tcp three-way handshake
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: UTF-8 -*- | |
# Author : [email protected] <github.com/tintinweb> | |
''' | |
A simple TCP three-way handshake example | |
#> python scapy_tcp_handshake.py | |
DEBUG:__main__:init: ('oststrom.com', 80) | |
DEBUG:__main__:start | |
DEBUG:__main__:SND: SYN | |
DEBUG:__main__:RCV: SYN+ACK | |
DEBUG:__main__:SND: SYN+ACK -> ACK | |
DEBUG:__main__:RCV: None | |
DEBUG:__main__:RCV: None | |
None | |
DEBUG:__main__:SND: FIN | |
DEBUG:__main__:RCV: None | |
Note: linux might send an RST for forged SYN packets. Disable it by executing: | |
#> iptables -A OUTPUT -p tcp --tcp-flags RST RST -s <src_ip> -j DROP | |
''' | |
from scapy.all import * | |
import logging | |
logger = logging.getLogger(__name__) | |
class TcpHandshake(object): | |
def __init__(self, target): | |
self.seq = 0 | |
self.seq_next = 0 | |
self.target = target | |
self.dst = iter(Net(target[0])).next() | |
self.dport = target[1] | |
self.sport = random.randrange(0,2**16) | |
self.l4 = IP(dst=target[0])/TCP(sport=self.sport, dport=self.dport, flags=0, | |
seq=random.randrange(0,2**32)) | |
self.src = self.l4.src | |
self.swin = self.l4[TCP].window | |
self.dwin=1 | |
logger.debug("init: %s"%repr(target)) | |
def start(self): | |
logger.debug("start") | |
return self.send_syn() | |
def match_packet(self, pkt): | |
if pkt.haslayer(IP) and pkt[IP].dst == self.l4[IP].src \ | |
and pkt.haslayer(TCP) and pkt[TCP].dport == self.sport \ | |
and pkt[TCP].ack == self.seq_next: | |
return True | |
return False | |
def _sr1(self, pkt): | |
send(pkt) | |
ans = sniff(filter="tcp port %s"%self.target[1],lfilter=self.match_packet,count=1,timeout=1) | |
return ans[0] if ans else None | |
def handle_recv(self, pkt): | |
if pkt and pkt.haslayer(IP) and pkt.haslayer(TCP): | |
if pkt[TCP].flags & 0x3f == 0x12: # SYN+ACK | |
logger.debug("RCV: SYN+ACK") | |
return self.send_synack_ack(pkt) | |
elif pkt[TCP].flags & 4 != 0: # RST | |
logger.debug("RCV: RST") | |
raise Exception("RST") | |
elif pkt[TCP].flags & 0x1 == 1: # FIN | |
logger.debug("RCV: FIN") | |
return self.send_finack(pkt) | |
elif pkt[TCP].flags & 0x3f == 0x10: # FIN+ACK | |
logger.debug("RCV: FIN+ACK") | |
return self.send_ack(pkt) | |
logger.debug("RCV: %s"%repr(pkt)) | |
return None | |
def send_syn(self): | |
logger.debug("SND: SYN") | |
self.l4[TCP].flags = "S" | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = self._sr1(self.l4) | |
self.l4[TCP].seq += 1 | |
return self.handle_recv(response) | |
def send_synack_ack(self, pkt): | |
logger.debug("SND: SYN+ACK -> ACK") | |
self.l4[TCP].ack = pkt[TCP].seq+1 | |
self.l4[TCP].flags = "A" | |
self.seq_next = self.l4[TCP].seq | |
response = self._sr1(self.l4) | |
return self.handle_recv(response) | |
def send_data(self, d): | |
self.l4[TCP].flags = "PA" | |
response = self._sr1(self.l4/d) | |
self.seq_next = self.l4[TCP].seq + len(d) | |
self.l4[TCP].seq += len(d) | |
return self.handle_recv(response) | |
def send_fin(self): | |
logger.debug("SND: FIN") | |
self.l4[TCP].flags = "F" | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = self._sr1(self.l4) | |
self.l4[TCP].seq += 1 | |
return self.handle_recv(response) | |
def send_finack(self, pkt): | |
logger.debug("SND: FIN+ACK") | |
self.l4[TCP].flags = "FA" | |
self.l4[TCP].ack = pkt[TCP].seq+1 | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = send(self.l4) | |
self.l4[TCP].seq += 1 | |
raise Exception("FIN+ACK") | |
def send_ack(self, pkt): | |
logger.debug("SND: ACK") | |
self.l4[TCP].flags = "A" | |
self.l4[TCP].ack = pkt[TCP].seq+1 | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = self._sr1(self.l4) | |
self.l4[TCP].seq += 1 | |
if __name__=='__main__': | |
logging.basicConfig(level=logging.DEBUG) | |
logger.setLevel(logging.DEBUG) | |
conf.verb = 0 | |
tcp_hs = TcpHandshake(("oststrom.com",80)) | |
tcp_hs.start() | |
print repr(tcp_hs.send_data("INTENTIONAL BAD REQUEST\r\n\r\n\r\n")) | |
tcp_hs.send_fin() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment