Last active
October 7, 2024 15:46
-
-
Save tintinweb/8523a9a43a2fb61a6770 to your computer and use it in GitHub Desktop.
simple scapy tcp three-way handshake
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: UTF-8 -*- | |
# Author : [email protected] <github.com/tintinweb> | |
''' | |
A simple TCP three-way handshake example | |
#> python scapy_tcp_handshake.py | |
DEBUG:__main__:init: ('oststrom.com', 80) | |
DEBUG:__main__:start | |
DEBUG:__main__:SND: SYN | |
DEBUG:__main__:RCV: SYN+ACK | |
DEBUG:__main__:SND: SYN+ACK -> ACK | |
DEBUG:__main__:RCV: None | |
DEBUG:__main__:RCV: None | |
None | |
DEBUG:__main__:SND: FIN | |
DEBUG:__main__:RCV: None | |
Note: linux might send an RST for forged SYN packets. Disable it by executing: | |
#> iptables -A OUTPUT -p tcp --tcp-flags RST RST -s <src_ip> -j DROP | |
''' | |
from scapy.all import * | |
import logging | |
logger = logging.getLogger(__name__) | |
class TcpHandshake(object): | |
def __init__(self, target): | |
self.seq = 0 | |
self.seq_next = 0 | |
self.target = target | |
self.dst = iter(Net(target[0])).next() | |
self.dport = target[1] | |
self.sport = random.randrange(0,2**16) | |
self.l4 = IP(dst=target[0])/TCP(sport=self.sport, dport=self.dport, flags=0, | |
seq=random.randrange(0,2**32)) | |
self.src = self.l4.src | |
self.swin = self.l4[TCP].window | |
self.dwin=1 | |
logger.debug("init: %s"%repr(target)) | |
def start(self): | |
logger.debug("start") | |
return self.send_syn() | |
def match_packet(self, pkt): | |
if pkt.haslayer(IP) and pkt[IP].dst == self.l4[IP].src \ | |
and pkt.haslayer(TCP) and pkt[TCP].dport == self.sport \ | |
and pkt[TCP].ack == self.seq_next: | |
return True | |
return False | |
def _sr1(self, pkt): | |
send(pkt) | |
ans = sniff(filter="tcp port %s"%self.target[1],lfilter=self.match_packet,count=1,timeout=1) | |
return ans[0] if ans else None | |
def handle_recv(self, pkt): | |
if pkt and pkt.haslayer(IP) and pkt.haslayer(TCP): | |
if pkt[TCP].flags & 0x3f == 0x12: # SYN+ACK | |
logger.debug("RCV: SYN+ACK") | |
return self.send_synack_ack(pkt) | |
elif pkt[TCP].flags & 4 != 0: # RST | |
logger.debug("RCV: RST") | |
raise Exception("RST") | |
elif pkt[TCP].flags & 0x1 == 1: # FIN | |
logger.debug("RCV: FIN") | |
return self.send_finack(pkt) | |
elif pkt[TCP].flags & 0x3f == 0x10: # FIN+ACK | |
logger.debug("RCV: FIN+ACK") | |
return self.send_ack(pkt) | |
logger.debug("RCV: %s"%repr(pkt)) | |
return None | |
def send_syn(self): | |
logger.debug("SND: SYN") | |
self.l4[TCP].flags = "S" | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = self._sr1(self.l4) | |
self.l4[TCP].seq += 1 | |
return self.handle_recv(response) | |
def send_synack_ack(self, pkt): | |
logger.debug("SND: SYN+ACK -> ACK") | |
self.l4[TCP].ack = pkt[TCP].seq+1 | |
self.l4[TCP].flags = "A" | |
self.seq_next = self.l4[TCP].seq | |
response = self._sr1(self.l4) | |
return self.handle_recv(response) | |
def send_data(self, d): | |
self.l4[TCP].flags = "PA" | |
response = self._sr1(self.l4/d) | |
self.seq_next = self.l4[TCP].seq + len(d) | |
self.l4[TCP].seq += len(d) | |
return self.handle_recv(response) | |
def send_fin(self): | |
logger.debug("SND: FIN") | |
self.l4[TCP].flags = "F" | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = self._sr1(self.l4) | |
self.l4[TCP].seq += 1 | |
return self.handle_recv(response) | |
def send_finack(self, pkt): | |
logger.debug("SND: FIN+ACK") | |
self.l4[TCP].flags = "FA" | |
self.l4[TCP].ack = pkt[TCP].seq+1 | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = send(self.l4) | |
self.l4[TCP].seq += 1 | |
raise Exception("FIN+ACK") | |
def send_ack(self, pkt): | |
logger.debug("SND: ACK") | |
self.l4[TCP].flags = "A" | |
self.l4[TCP].ack = pkt[TCP].seq+1 | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = self._sr1(self.l4) | |
self.l4[TCP].seq += 1 | |
if __name__=='__main__': | |
logging.basicConfig(level=logging.DEBUG) | |
logger.setLevel(logging.DEBUG) | |
conf.verb = 0 | |
tcp_hs = TcpHandshake(("oststrom.com",80)) | |
tcp_hs.start() | |
print repr(tcp_hs.send_data("INTENTIONAL BAD REQUEST\r\n\r\n\r\n")) | |
tcp_hs.send_fin() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I am interested in testing how many tcp sessions a device can handle, such as a stateful firewall or NAT device. My thought is to have 2 hosts running scapy connected to the device being tested. I could use a script like this (with many iterations, changing IP address and/or port) to act as a TCP client. Can similar be done to emulate a TCP server? Can Scapy receive TCP SYN packets and manually process them with a similar script set to receive SYN-ACK and respond with ACK, or will that host machine kernel always try to respond to the TCP session? My goal is for Scapy to be stateless in the process such that the test is not be limited by the kernel of the hosts running the handshake emulation.