Last active
November 10, 2022 11:53
-
-
Save danielealbano/6a2bfb99b309c50a3e1f3af9bf5ac61b to your computer and use it in GitHub Desktop.
Scapy TCP Keepalive simulator - Proof of Concept / Single Connection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
This is a proof of concept that simulates the tcp keepalive probes sent from the SO to | |
keep alive a connection and check that it is still active. | |
The simulation is partial, only the ACK packets are sent to the other end, the response | |
is not checked. | |
In addition, this script support only one connection automatically identified using | |
the destination ip and port. | |
I developed this script to workaround a limitation of an odbc connector used by an | |
application running inside a kubernetes container with the node using a gateway that | |
kills the connections after 5 minutes of inactivity. | |
The RFC states that the ACK packet sent need to use the SEQ NUMBER - 1 as ack number | |
so this script sniff the connections to identify which is the one for we are interested | |
in and keep track of the SEQ NUMBER and the timestamp of the last sent packet to | |
simulate the behaviour of the SO. | |
You need to install scapy, I used scapy 2.36. | |
Usage: | |
scapy-tcp-keepalive-simulator.py DESTINATION_IP DESTINATION_PORT KEEPALIVE_PROBE_TIME_IN_MINUTES KEEPALIVE_PROBE_INTERVAL | |
Ie. | |
scapy-tcp-keepalive-simulator.py 216.58.213.196 80 3 180 | |
Todo List | |
- support multiple connections | |
- detect connection reset / abort / end (ie. FIN/RST flags) | |
- refactor the code to use classes | |
- properly support multithreading (ie. handle properly the thread loop) | |
- do not print out anything, it's useless | |
''' | |
import sys | |
from datetime import datetime,timedelta | |
import time | |
from scapy.all import IP,TCP,send,srloop,sniff | |
from threading import Thread | |
destination_ip = sys.argv[1] | |
destination_port = int(sys.argv[2]) | |
keepalive_probe_time = int(sys.argv[3]) | |
keepalive_probe_interval = int(sys.argv[4]) | |
source_ip = None | |
source_port = None | |
last_seq = 0 | |
last_ack = 0 | |
last_update = None | |
def sniffer_handle_packet(packet): | |
global last_update, last_seq, source_ip, source_port, destination_ip, destination_port | |
if (packet.haslayer(IP) == False) or (packet.haslayer(TCP) == False): | |
return | |
if (not packet[IP].dst == destination_ip) or (not packet[TCP].dport == destination_port): | |
return | |
source_ip = packet[IP].src | |
source_port = packet[TCP].sport | |
last_seq = packet[TCP].seq | |
last_ack = packet[TCP].ack | |
last_update = datetime.now() | |
return '[{}] Received TCP/IP Packet: {}:{} ==> {}:{}, flags[{}], seq {}, ack {}'.format( | |
datetime.now(), | |
packet[IP].src, packet[TCP].sport, | |
packet[IP].dst, packet[TCP].dport, | |
packet[TCP].flags, packet[TCP].seq, packet[TCP].ack) | |
def sniffer_thread_main(): | |
sniff(filter="tcp", prn=sniffer_handle_packet, store=0) | |
def keepalive_thread_main(): | |
global keepalive_probe_time, keepalive_probe_interval, last_update, last_seq, last_ack, source_ip, source_port, destination_ip, destination_port | |
while(True): | |
if (not last_update == None) and ((datetime.now() - timedelta(minutes=keepalive_probe_time)) > last_update): | |
ip=IP(src=source_ip,dst=destination_ip) | |
ACK=TCP(sport=source_port, dport=destination_port, flags='A', ack=last_ack, seq=last_seq - 1) | |
packet=ip/ACK | |
send(packet) | |
print('[{}] Sending TCP/IP Packet: {}:{} ==> {}:{}, flags[{}], seq {}, ack {}'.format( | |
datetime.now(), | |
packet[IP].src, packet[TCP].sport, | |
packet[IP].dst, packet[TCP].dport, | |
packet[TCP].flags, packet[TCP].seq, packet[TCP].ack)) | |
time.sleep(keepalive_probe_interval) | |
time.sleep(1) | |
def app_main(): | |
sniffer_thread = Thread(target=sniffer_thread_main) | |
sniffer_thread.start() | |
keepalive_thread = Thread(target=keepalive_thread_main) | |
keepalive_thread.start() | |
sniffer_thread.join() | |
keepalive_thread.join() | |
if __name__ == '__main__': | |
app_main() |
The TCP keepalive should have same ack number as previous packet and seq number as -1 the prev seq number.
The script needs modification accordingly.
Hey @manojpapisetty thanks for pointing out the issue in the implementation, just fixed it!
here lack of last_ack
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The TCP keepalive should have same ack number as previous packet and seq number as -1 the prev seq number.
The script needs modification accordingly.