Skip to content

Instantly share code, notes, and snippets.

@SAPikachu
Created January 27, 2014 02:47
Show Gist options
  • Save SAPikachu/8642558 to your computer and use it in GitHub Desktop.
Save SAPikachu/8642558 to your computer and use it in GitHub Desktop.
OpenVPN/6in4 tunnel over IPSec (alpha)
  • Draft, very hard to configure and use
  • Don't ask me how to use it at this moment, once it matures I will move it to a repository and add instructions.
  • Remember to run ip xfrm state flush && ip xfrm policy flush after changing configuration and before re-running easy-ipsec.py
  • Requirements:
    • Linux
    • iproute2
    • dig
    • Python 2.7
    • PyCrypto
{
"mode": "client",
"detect_local_ip_from_route": true,
"logging": {
"level": "INFO"
},
"enc": {
"algo": "cbc(aes)",
"keysize": 32
},
"auth": {
"algo": "hmac(sha256)",
"keysize": 32
},
"pbkdf2": {
"salt": "your salt",
"iterations": 10000
},
"peers": {
"openvpn": {
"client": "your.ddns.host.name.or.public.ip",
"client_private": "your.lan.ip",
"server": "your.server.ip",
"selector": "udp {server_port} 12345",
"mode": "tunnel",
"key": "your key"
},
"6in4 tunnel": {
"client": "your.ddns.host.name.or.public.ip",
"server": "your.server.ip",
"selector": "ipv6",
"mode": "transport",
"key": "your key"
}
}
}
#!/usr/bin/env python
from __future__ import print_function
import sys
import json
import logging
import re
import subprocess
from Crypto.Hash import HMAC, SHA512
from Crypto.Protocol.KDF import PBKDF2
IP_REGEX = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}"
log = logging.getLogger("easy-ipsec")
def generate_keys(config, peer, type):
prf = lambda p, s: HMAC.new(p, s, SHA512).digest()
enc_size = config["enc"]["keysize"]
auth_size = config["auth"]["keysize"]
key_length = enc_size + auth_size + 4
password = peer["key"] + peer["client"] + peer["server"]
password += peer["selector"] + type
key = PBKDF2(
password,
config["pbkdf2"]["salt"],
key_length,
config["pbkdf2"]["iterations"],
prf,
)
return {
"spi": key[:4],
"enc": key[4:4 + enc_size],
"auth": key[4 + enc_size:],
}
def command(cmd, regex=None, flags=0):
log.debug("Command: %s", cmd)
output = subprocess.check_output(cmd, shell=True)
log.debug("Output: %s", output)
if not regex:
return
m = re.search(regex, output, flags=flags)
return m.group(1)
def resolve_host(host):
if re.match(IP_REGEX, host.strip()):
return host
ip = command(
"dig +trace +short %s" % (host,),
r"^A +(%s)\b" % (IP_REGEX,),
re.MULTILINE,
)
log.info("IP of %s: %s", host, ip)
return ip
def detect_local_ip(remote):
ip = command(
"ip route get %s" % (remote,),
r"\bsrc +(%s)\b" % (IP_REGEX,),
)
log.info("Detected local IP: %s", ip)
return ip
def build_ipsec_commands(
config, peer, local, remote, remote_private, keys, direction,
):
string_keys = {
name: "0x" + "".join("%02x" % (ord(b),) for b in key)
for name, key in keys.iteritems()
}
params = {
"local": local,
"remote": remote,
"remote_private": remote_private,
"spi": string_keys["spi"],
"mode": peer["mode"],
"enc_algo": config["enc"]["algo"],
"enc_key": string_keys["enc"],
"auth_algo": config["auth"]["algo"],
"auth_key": string_keys["auth"],
"direction": direction,
}
if direction == "out":
params["remote_param"] = "dst"
params["local_param"] = "src"
params["selector"] = peer["selector"].format(
server_port="sport" if config["mode"] == "server" else "dport",
client_port="sport" if config["mode"] == "client" else "dport",
)
elif direction == "in":
params["remote_param"] = "src"
params["local_param"] = "dst"
params["selector"] = peer["selector"].format(
server_port="dport" if config["mode"] == "server" else "sport",
client_port="dport" if config["mode"] == "client" else "sport",
)
else:
assert False
return [x.format(**params) for x in [
"ip xfrm state add {remote_param} {remote} {local_param} {local} "
"proto esp spi {spi} mode {mode} enc '{enc_algo}' {enc_key} "
"auth '{auth_algo}' {auth_key} "
"sel proto {selector}",
"ip xfrm policy add dir {direction} "
"{remote_param} {remote_private} {local_param} {local} "
"proto {selector} tmpl {remote_param} {remote} {local_param} {local} "
"proto esp spi {spi} mode {mode}",
]]
def process_peer(config, peer):
if config["mode"] == "server":
local, remote = peer["server"], peer["client"]
remote_private = peer.get("client_private", client)
type_out = "server-to-client"
type_in = "client-to-server"
else:
local, remote = peer["client"], peer["server"]
remote_private = remote
type_out = "client-to-server"
type_in = "server-to-client"
remote = resolve_host(remote)
if config["detect_local_ip_from_route"]:
local = detect_local_ip(remote)
else:
local = resolve_host(local)
commands = []
commands.extend(build_ipsec_commands(
config, peer,
local=local,
remote=remote,
remote_private=remote_private,
keys=generate_keys(config, peer, type_out),
direction="out",
))
commands.extend(build_ipsec_commands(
config, peer,
local=local,
remote=remote,
remote_private=remote_private,
keys=generate_keys(config, peer, type_in),
direction="in",
))
#[print(x) for x in commands]
[subprocess.call(x, shell=True) for x in commands]
log.info("Configured IPSec for %s, %s (%s)",
local, remote, peer["selector"])
def validate_config(config):
if config["mode"] not in ("client", "server"):
log.error("mode must be client or server")
return False
return True
def main():
with open("config.json", "r") as f:
config = json.load(f)
logging.basicConfig(**config["logging"])
if not validate_config(config):
sys.exit(1)
[process_peer(config, peer) for _, peer in config["peers"].iteritems()]
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment