Skip to content

Instantly share code, notes, and snippets.

@danielealbano
Last active November 10, 2022 11:53
Show Gist options
  • Save danielealbano/6a2bfb99b309c50a3e1f3af9bf5ac61b to your computer and use it in GitHub Desktop.
Save danielealbano/6a2bfb99b309c50a3e1f3af9bf5ac61b to your computer and use it in GitHub Desktop.
Scapy TCP Keepalive simulator - Proof of Concept / Single Connection
'''
This is a proof of concept that simulates the tcp keepalive probes sent from the SO to
keep alive a connection and check that it is still active.
The simulation is partial, only the ACK packets are sent to the other end, the response
is not checked.
In addition, this script support only one connection automatically identified using
the destination ip and port.
I developed this script to workaround a limitation of an odbc connector used by an
application running inside a kubernetes container with the node using a gateway that
kills the connections after 5 minutes of inactivity.
The RFC states that the ACK packet sent need to use the SEQ NUMBER - 1 as ack number
so this script sniff the connections to identify which is the one for we are interested
in and keep track of the SEQ NUMBER and the timestamp of the last sent packet to
simulate the behaviour of the SO.
You need to install scapy, I used scapy 2.36.
Usage:
scapy-tcp-keepalive-simulator.py DESTINATION_IP DESTINATION_PORT KEEPALIVE_PROBE_TIME_IN_MINUTES KEEPALIVE_PROBE_INTERVAL
Ie.
scapy-tcp-keepalive-simulator.py 216.58.213.196 80 3 180
Todo List
- support multiple connections
- detect connection reset / abort / end (ie. FIN/RST flags)
- refactor the code to use classes
- properly support multithreading (ie. handle properly the thread loop)
- do not print out anything, it's useless
'''
import sys
from datetime import datetime,timedelta
import time
from scapy.all import IP,TCP,send,srloop,sniff
from threading import Thread
destination_ip = sys.argv[1]
destination_port = int(sys.argv[2])
keepalive_probe_time = int(sys.argv[3])
keepalive_probe_interval = int(sys.argv[4])
source_ip = None
source_port = None
last_seq = 0
last_ack = 0
last_update = None
def sniffer_handle_packet(packet):
global last_update, last_seq, source_ip, source_port, destination_ip, destination_port
if (packet.haslayer(IP) == False) or (packet.haslayer(TCP) == False):
return
if (not packet[IP].dst == destination_ip) or (not packet[TCP].dport == destination_port):
return
source_ip = packet[IP].src
source_port = packet[TCP].sport
last_seq = packet[TCP].seq
last_ack = packet[TCP].ack
last_update = datetime.now()
return '[{}] Received TCP/IP Packet: {}:{} ==> {}:{}, flags[{}], seq {}, ack {}'.format(
datetime.now(),
packet[IP].src, packet[TCP].sport,
packet[IP].dst, packet[TCP].dport,
packet[TCP].flags, packet[TCP].seq, packet[TCP].ack)
def sniffer_thread_main():
sniff(filter="tcp", prn=sniffer_handle_packet, store=0)
def keepalive_thread_main():
global keepalive_probe_time, keepalive_probe_interval, last_update, last_seq, last_ack, source_ip, source_port, destination_ip, destination_port
while(True):
if (not last_update == None) and ((datetime.now() - timedelta(minutes=keepalive_probe_time)) > last_update):
ip=IP(src=source_ip,dst=destination_ip)
ACK=TCP(sport=source_port, dport=destination_port, flags='A', ack=last_ack, seq=last_seq - 1)
packet=ip/ACK
send(packet)
print('[{}] Sending TCP/IP Packet: {}:{} ==> {}:{}, flags[{}], seq {}, ack {}'.format(
datetime.now(),
packet[IP].src, packet[TCP].sport,
packet[IP].dst, packet[TCP].dport,
packet[TCP].flags, packet[TCP].seq, packet[TCP].ack))
time.sleep(keepalive_probe_interval)
time.sleep(1)
def app_main():
sniffer_thread = Thread(target=sniffer_thread_main)
sniffer_thread.start()
keepalive_thread = Thread(target=keepalive_thread_main)
keepalive_thread.start()
sniffer_thread.join()
keepalive_thread.join()
if __name__ == '__main__':
app_main()
@manojpapisetty
Copy link

The TCP keepalive should have same ack number as previous packet and seq number as -1 the prev seq number.

The script needs modification accordingly.

@danielealbano
Copy link
Author

The TCP keepalive should have same ack number as previous packet and seq number as -1 the prev seq number.

The script needs modification accordingly.

Hey @manojpapisetty thanks for pointing out the issue in the implementation, just fixed it!

@WinChua
Copy link

WinChua commented Nov 10, 2022

lack of last_ack

here lack of last_ack

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment