-
-
Save tintinweb/8523a9a43a2fb61a6770 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
# -*- coding: UTF-8 -*- | |
# Author : [email protected] <github.com/tintinweb> | |
''' | |
A simple TCP three-way handshake example | |
#> python scapy_tcp_handshake.py | |
DEBUG:__main__:init: ('oststrom.com', 80) | |
DEBUG:__main__:start | |
DEBUG:__main__:SND: SYN | |
DEBUG:__main__:RCV: SYN+ACK | |
DEBUG:__main__:SND: SYN+ACK -> ACK | |
DEBUG:__main__:RCV: None | |
DEBUG:__main__:RCV: None | |
None | |
DEBUG:__main__:SND: FIN | |
DEBUG:__main__:RCV: None | |
Note: linux might send an RST for forged SYN packets. Disable it by executing: | |
#> iptables -A OUTPUT -p tcp --tcp-flags RST RST -s <src_ip> -j DROP | |
''' | |
from scapy.all import * | |
import logging | |
logger = logging.getLogger(__name__) | |
class TcpHandshake(object): | |
def __init__(self, target): | |
self.seq = 0 | |
self.seq_next = 0 | |
self.target = target | |
self.dst = iter(Net(target[0])).next() | |
self.dport = target[1] | |
self.sport = random.randrange(0,2**16) | |
self.l4 = IP(dst=target[0])/TCP(sport=self.sport, dport=self.dport, flags=0, | |
seq=random.randrange(0,2**32)) | |
self.src = self.l4.src | |
self.swin = self.l4[TCP].window | |
self.dwin=1 | |
logger.debug("init: %s"%repr(target)) | |
def start(self): | |
logger.debug("start") | |
return self.send_syn() | |
def match_packet(self, pkt): | |
if pkt.haslayer(IP) and pkt[IP].dst == self.l4[IP].src \ | |
and pkt.haslayer(TCP) and pkt[TCP].dport == self.sport \ | |
and pkt[TCP].ack == self.seq_next: | |
return True | |
return False | |
def _sr1(self, pkt): | |
send(pkt) | |
ans = sniff(filter="tcp port %s"%self.target[1],lfilter=self.match_packet,count=1,timeout=1) | |
return ans[0] if ans else None | |
def handle_recv(self, pkt): | |
if pkt and pkt.haslayer(IP) and pkt.haslayer(TCP): | |
if pkt[TCP].flags & 0x3f == 0x12: # SYN+ACK | |
logger.debug("RCV: SYN+ACK") | |
return self.send_synack_ack(pkt) | |
elif pkt[TCP].flags & 4 != 0: # RST | |
logger.debug("RCV: RST") | |
raise Exception("RST") | |
elif pkt[TCP].flags & 0x1 == 1: # FIN | |
logger.debug("RCV: FIN") | |
return self.send_finack(pkt) | |
elif pkt[TCP].flags & 0x3f == 0x10: # FIN+ACK | |
logger.debug("RCV: FIN+ACK") | |
return self.send_ack(pkt) | |
logger.debug("RCV: %s"%repr(pkt)) | |
return None | |
def send_syn(self): | |
logger.debug("SND: SYN") | |
self.l4[TCP].flags = "S" | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = self._sr1(self.l4) | |
self.l4[TCP].seq += 1 | |
return self.handle_recv(response) | |
def send_synack_ack(self, pkt): | |
logger.debug("SND: SYN+ACK -> ACK") | |
self.l4[TCP].ack = pkt[TCP].seq+1 | |
self.l4[TCP].flags = "A" | |
self.seq_next = self.l4[TCP].seq | |
response = self._sr1(self.l4) | |
return self.handle_recv(response) | |
def send_data(self, d): | |
self.l4[TCP].flags = "PA" | |
response = self._sr1(self.l4/d) | |
self.seq_next = self.l4[TCP].seq + len(d) | |
self.l4[TCP].seq += len(d) | |
return self.handle_recv(response) | |
def send_fin(self): | |
logger.debug("SND: FIN") | |
self.l4[TCP].flags = "F" | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = self._sr1(self.l4) | |
self.l4[TCP].seq += 1 | |
return self.handle_recv(response) | |
def send_finack(self, pkt): | |
logger.debug("SND: FIN+ACK") | |
self.l4[TCP].flags = "FA" | |
self.l4[TCP].ack = pkt[TCP].seq+1 | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = send(self.l4) | |
self.l4[TCP].seq += 1 | |
raise Exception("FIN+ACK") | |
def send_ack(self, pkt): | |
logger.debug("SND: ACK") | |
self.l4[TCP].flags = "A" | |
self.l4[TCP].ack = pkt[TCP].seq+1 | |
self.seq_next = self.l4[TCP].seq + 1 | |
response = self._sr1(self.l4) | |
self.l4[TCP].seq += 1 | |
if __name__=='__main__': | |
logging.basicConfig(level=logging.DEBUG) | |
logger.setLevel(logging.DEBUG) | |
conf.verb = 0 | |
tcp_hs = TcpHandshake(("oststrom.com",80)) | |
tcp_hs.start() | |
print repr(tcp_hs.send_data("INTENTIONAL BAD REQUEST\r\n\r\n\r\n")) | |
tcp_hs.send_fin() |
If an incoming packet arrives in a time gap between send and sniff (lines 54-55), it will be lost, because sniff does not receive packets that arrived before it was called.
I am interested in testing how many tcp sessions a device can handle, such as a stateful firewall or NAT device. My thought is to have 2 hosts running scapy connected to the device being tested. I could use a script like this (with many iterations, changing IP address and/or port) to act as a TCP client. Can similar be done to emulate a TCP server? Can Scapy receive TCP SYN packets and manually process them with a similar script set to receive SYN-ACK and respond with ACK, or will that host machine kernel always try to respond to the TCP session? My goal is for Scapy to be stateless in the process such that the test is not be limited by the kernel of the hosts running the handshake emulation.
Can you please provide an example that how your file could be used with an application layer built in scapy? The purpose is to connect to a server and receive the data. In my case new layer of protocol has already been implemented. Just need to equip 3way handshake and sequence and acks at TCP level so that it could function properly.
I am new to whole this stuff. Please take it into consideration while answering. FYI, new layer is of S/IP, sercos interface protocol.