Skip to content

Instantly share code, notes, and snippets.

@remittor
Last active March 4, 2025 08:07
Show Gist options
  • Save remittor/8c3d9ff293b2ba4b13c367cc1a69f9eb to your computer and use it in GitHub Desktop.
Save remittor/8c3d9ff293b2ba4b13c367cc1a69f9eb to your computer and use it in GitHub Desktop.
# SPDX-License-Identifier: MIT
# Author: remittor <[email protected]>
# Created: 2024
import os
import sys
import glob
import subprocess
import optparse
import random
import datetime
g_main_config_src = '.main.config'
g_main_config_fn = None
g_main_config_type = None
g_defclient_config_fn = "_defclient.config"
parser = optparse.OptionParser("usage: %prog [options]")
parser.add_option("-t", "--tmpcfg", dest="tmpcfg", default = g_defclient_config_fn)
parser.add_option("-c", "--conf", dest="confgen", action="store_true", default = False)
parser.add_option("-q", "--qrcode", dest="qrcode", action="store_true", default = False)
parser.add_option("-a", "--add", dest="addcl", default = "")
parser.add_option("-u", "--update", dest="update", default = "")
parser.add_option("-d", "--delete", dest="delete", default = "")
parser.add_option("-i", "--ipaddr", dest="ipaddr", default = "")
parser.add_option("-p", "--port", dest="port", default = None, type = 'int')
parser.add_option("", "--make", dest="makecfg", default = "")
parser.add_option("", "--tun", dest="tun", default = "")
parser.add_option("", "--create", dest="create", action="store_true", default = False)
(opt, args) = parser.parse_args()
g_defserver_config = """
[Interface]
#_GenKeyTime = <SERVER_KEY_TIME>
PrivateKey = <SERVER_PRIVATE_KEY>
#_PublicKey = <SERVER_PUBLIC_KEY>
Address = <SERVER_ADDR>
ListenPort = <SERVER_PORT>
Jc = <JC>
Jmin = <JMIN>
Jmax = <JMAX>
S1 = <S1>
S2 = <S2>
H1 = <H1>
H2 = <H2>
H3 = <H3>
H4 = <H4>
PostUp = iptables -A INPUT -p udp --dport <SERVER_PORT> -m conntrack --ctstate NEW -j ACCEPT --wait 10 --wait-interval 50; iptables -A FORWARD -i <SERVER_IFACE> -o <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -A FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -t nat -A POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50; ip6tables -A FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; ip6tables -t nat -A POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50
PostDown = iptables -D INPUT -p udp --dport <SERVER_PORT> -m conntrack --ctstate NEW -j ACCEPT --wait 10 --wait-interval 50; iptables -D FORWARD -i <SERVER_IFACE> -o <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -D FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -t nat -D POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50; ip6tables -D FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; ip6tables -t nat -D POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50
"""
g_defclient_config = """
[Interface]
PrivateKey = <CLIENT_PRIVATE_KEY>
#_PublicKey = <CLIENT_PUBLIC_KEY>
Address = <CLIENT_TUNNEL_IP>
DNS = 8.8.8.8
Jc = <JC>
Jmin = <JMIN>
Jmax = <JMAX>
S1 = <S1>
S2 = <S2>
H1 = <H1>
H2 = <H2>
H3 = <H3>
H4 = <H4>
[Peer]
AllowedIPs = 0.0.0.0/5, 8.0.0.0/7, 11.0.0.0/8, 12.0.0.0/6, 16.0.0.0/4, 32.0.0.0/3, 64.0.0.0/2, 128.0.0.0/3, 160.0.0.0/5, 168.0.0.0/6, 172.0.0.0/12, 172.32.0.0/11, 172.64.0.0/10, 172.128.0.0/9, 173.0.0.0/8, 174.0.0.0/7, 176.0.0.0/4, 192.0.0.0/9, 192.128.0.0/11, 192.160.0.0/13, 192.169.0.0/16, 192.170.0.0/15, 192.172.0.0/14, 192.176.0.0/12, 192.192.0.0/10, 193.0.0.0/8, 194.0.0.0/7, 196.0.0.0/6, 200.0.0.0/5, 208.0.0.0/4, 8.8.8.8/32
Endpoint = <SERVER_ADDR>:<SERVER_PORT>
PersistentKeepalive = 60
PublicKey = <SERVER_PUBLIC_KEY>
"""
class IPAddr():
def __init__(self, ipaddr = None):
self.ip = [ 0, 0, 0, 0 ]
self.mask = None
if ipaddr:
self.init(ipaddr)
def init(self, ipaddr):
_ipaddr = ipaddr
if not ipaddr:
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"')
if ' ' in ipaddr:
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"')
if ',' in ipaddr:
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"')
self.ip = [ 0, 0, 0, 0 ]
self.mask = None
if '/' in ipaddr:
self.mask = int(ipaddr.split('/')[1])
ipaddr = ipaddr.split('/')[0]
nlist = ipaddr.split('.')
if len(nlist) != 4:
raise RuntimeError(f'ERROR: Incorrect IP-addr: "{_ipaddr}"')
for n, num in enumerate(nlist):
self.ip[n] = int(num)
def __str__(self):
out = f'{self.ip[0]}.{self.ip[1]}.{self.ip[2]}.{self.ip[3]}'
if self.mask:
out += '/' + str(self.mask)
return out
class WGConfig():
def __init__(self, filename = None):
self.lines = [ ]
self.iface = { }
self.peer = { }
self.idsline = { }
self.cfg_fn = None
if filename:
self.load(filename)
def load(self, filename):
self.cfg_fn = None
self.lines = [ ]
self.iface = { }
self.peer = { }
self.idsline = { }
with open(filename, 'r') as file:
lines = file.readlines()
iface = None
secdata = [ ]
secdata_item = None
secline = [ ]
secline_item = None
for n, line in enumerate(lines):
line = line.rstrip()
self.lines.append(line)
if line.strip() == '':
continue
if line.startswith(' ') and not line.strip().startswith('#'):
raise RuntimeError(f'ERROR_CFG: Incorrect line #{n} into config "{filename}"')
if line.startswith('#') and not line.startswith('#_'):
continue
if line.startswith('[') and line.endswith(']'):
section_name = line[1:-1]
if not section_name:
raise RuntimeError(f'ERROR_CFG: Incorrect section name: "{section_name}" (#{n+1})')
#print(secname)
secdata_item = { "_section_name": section_name.lower() }
secline_item = { "_section_name": n }
if section_name.lower() == 'interface':
if iface:
raise RuntimeError(f'ERROR_CFG: Found second section Interface in line #{n+1}')
iface = secdata_item
elif section_name.lower() == 'peer':
pass
else:
raise RuntimeError(f'ERROR_CFG: Found incorrect section "{section_name}" in line #{n+1}')
secdata.append(secdata_item)
secline.append(secline_item)
continue
if line.startswith('#_') and ' = ' in line:
line = line[2:]
if line.startswith('#'):
continue
if ' = ' not in line:
raise RuntimeError(f'ERROR_CFG: Incorrect line into config: "{line}" (#{n+1})')
xv = line.find(' = ')
if xv <= 0:
raise RuntimeError(f'ERROR_CFG: Incorrect line into config: "{line}" (#{n+1})')
vname = line[:xv].strip()
value = line[xv+3:].strip()
#print(f' "{vname}" = "{value}"')
if not secdata_item:
raise RuntimeError(f'ERROR_CFG: Parameter "{vname}" have unknown section! (#{n+1})')
section_name = secdata_item['_section_name']
if vname in secdata_item:
raise RuntimeError(f'ERROR_CFG: Found duplicate of param "{vname}" into section "{section_name}" (#{n+1})')
secdata_item[vname] = value
secline_item[vname] = n
if not iface:
raise RuntimeError(f'ERROR_CFG: Cannot found section Interface!')
for i, item in enumerate(secdata):
line = secline[i]
peer_name = ""
if item['_section_name'] == 'interface':
self.iface = item
peer_name = "__this_server__"
if 'PublicKey' not in item:
raise RuntimeError(f'ERROR_CFG: Cannot found PublicKey in Interface')
if 'PrivateKey' not in item:
raise RuntimeError(f'ERROR_CFG: Cannot found PrivateKey in Interface')
else:
if 'Name' in item:
peer_name = item['Name']
if not peer_name:
raise RuntimeError(f'ERROR_CFG: Invalid peer Name in line #{line["Name"]}')
elif 'PublicKey' in item:
peer_name = item['PublicKey']
if not peer_name:
raise RuntimeError(f'ERROR_CFG: Invalid peer PublicKey in line #{line["PublicKey"]}')
else:
raise RuntimeError(f'ERROR_CFG: Invalid peer data in line #{line["_section_name"]}')
if 'AllowedIPs' not in item:
raise RuntimeError(f'ERROR_CFG: Cannot found "AllowedIPs" into peer "{peer_name}"')
if peer_name in self.peer:
raise RuntimeError(f'ERROR_CFG: Found duplicate peer with name "{peer_name}"')
self.peer[peer_name] = item
if peer_name in self.idsline:
raise RuntimeError(f'ERROR_CFG: Found duplicate peer with name "{peer_name}"')
min_line = line['_section_name']
max_line = min_line
self.idsline[f'{peer_name}'] = min_line
for vname in item:
self.idsline[f'{peer_name}|{vname}'] = line[vname]
if line[vname] > max_line:
max_line = line[vname]
item['_lines_range'] = ( min_line, max_line )
self.cfg_fn = filename
return len(self.peer)
def save(self, filename = None):
if not filename:
filename = self.cfg_fn
if not self.lines:
raise RuntimeError(f'ERROR: no data')
with open(filename, 'w', newline = '\n') as file:
for line in self.lines:
file.write(line + '\n')
def del_client(self, c_name):
if c_name not in self.peer:
raise RuntimeError(f'ERROR: Not found client "{c_name}" in peer list!')
client = self.peer[c_name]
ipaddr = client['AllowedIPs']
min_line, max_line = client['_lines_range']
del self.lines[min_line:max_line+1]
del self.peer[c_name]
secsize = max_line - min_line + 1
del_list = [ ]
for k, v in self.idsline.items():
if v >= min_line and v <= max_line:
del_list.append(k)
elif v > max_line:
self.idsline[k] = v - secsize
for k in del_list:
del self.idsline[k]
return ipaddr
def set_param(self, c_name, param_name, param_value, force = False, offset = 0):
if c_name not in self.peer:
raise RuntimeError(f'ERROR: Not found client "{c_name}" in peer list!')
line_prefix = ""
if param_name.startswith('_'):
line_prefix = "#_"
param_name = param_name[1:]
client = self.peer[c_name]
min_line, max_line = client['_lines_range']
if param_name in client:
nline = self.idsline[f'{c_name}|{param_name}']
line = self.lines[nline]
if line.startswith('#_'):
line_prefix = "#_"
self.lines[nline] = f'{line_prefix}{param_name} = {param_value}'
return
if not force:
raise RuntimeError(f'ERROR: Param "{param_name}" not found for client "{c_name}"')
new_line = f'{line_prefix}{param_name} = {param_value}'
client[param_name] = param_value
secsize = max_line - min_line + 1
if offset >= secsize:
raise RuntimeError(f'ERROR: Incorrect offset value = {offset} (secsize = {secsize})')
pos = max_line + 1 if offset <= 0 else min_line + offset
for k, v in self.idsline.items():
if v >= pos:
self.idsline[k] = v + 1
self.idsline[f'{c_name}|{param_name}'] = pos
self.lines.insert(pos, new_line)
return
def exec_cmd(cmd, input = None, shell = True, check = True, timeout = None):
proc = subprocess.run(cmd, input = input, shell = shell, check = check,
timeout = timeout, encoding = 'utf8',
stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
rc = proc.returncode
out = proc.stdout
return rc, out
def get_main_iface():
rc, out = exec_cmd('ip link show')
if rc:
raise RuntimeError(f'ERROR: Cannot get net interfaces')
for line in out.split('\n'):
if '<BROADCAST' in line and 'state UP' in line:
xv = line.split(':')
return xv[1].strip()
return None
def get_ext_ipaddr():
rc, out = exec_cmd('curl -4 -s icanhazip.com')
if rc:
raise RuntimeError(f'ERROR: Cannot get ext IP-Addr')
lines = out.split('\n')
ipaddr = lines[-1] if lines[-1] else lines[-2]
ipaddr = IPAddr(ipaddr)
return str(ipaddr)
def gen_pair_keys(cfg_type = None):
global g_main_config_type
if sys.platform == 'win32':
return 'client_priv_key', 'client_pub_key'
if not cfg_type:
cfg_type = g_main_config_type
if not cfg_type:
raise
wgtool = cfg_type.lower()
rc, out = exec_cmd(f'{wgtool} genkey')
if rc:
raise RuntimeError(f'ERROR: Cannot generate private key')
priv_key = out.strip()
if not priv_key:
raise RuntimeError(f'ERROR: Cannot generate private Key')
rc, out = exec_cmd(f'{wgtool} pubkey', input = priv_key + '\n')
if rc:
raise RuntimeError(f'ERROR: Cannot generate public key')
pub_key = out.strip()
if not pub_key:
raise RuntimeError(f'ERROR: Cannot generate public Key')
return priv_key, pub_key
def get_main_config_path(check = True):
global g_main_config_fn
global g_main_config_type
if not os.path.exists(g_main_config_src):
raise RuntimeError(f'ERROR: file "{g_main_config_src}" not found!')
with open(g_main_config_src, 'r') as file:
lines = file.readlines()
g_main_config_fn = lines[0].strip()
cfg_exists = os.path.exists(g_main_config_fn)
g_main_config_type = 'WG'
if os.path.basename(g_main_config_fn).startswith('a'):
g_main_config_type = 'AWG'
if check:
if not cfg_exists:
raise RuntimeError(f'ERROR: Main {g_main_config_type} config file "{g_main_config_fn}" not found!')
return g_main_config_fn
# -------------------------------------------------------------------------------------
if opt.makecfg:
g_main_config_fn = opt.makecfg
if os.path.exists(g_main_config_fn):
raise RuntimeError(f'ERROR: file "{g_main_config_fn}" already exists!')
m_cfg_type = 'WG'
if os.path.basename(g_main_config_fn).startswith('a'):
m_cfg_type = 'AWG'
print(f'Make {m_cfg_type} server config: "{g_main_config_fn}"...')
main_iface = get_main_iface()
if not main_iface:
raise RuntimeError(f'ERROR: Cannot get main network interface!')
print(f'Main network iface: "{main_iface}"')
if opt.port <= 1000 or opt.port > 65530:
raise RuntimeError(f'ERROR: Incorrect argument port = {opt.port}')
if not opt.ipaddr:
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"')
ipaddr = IPAddr(opt.ipaddr)
if not ipaddr.mask:
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"')
if opt.tun:
tun_name = opt.tun
else:
cfg_name = os.path.basename(g_main_config_fn)
tun_name = os.path.splitext(cfg_name)[0].strip()
print(f'Tunnel iface: "{tun_name}"')
priv_key, pub_key = gen_pair_keys(m_cfg_type)
random.seed()
jc = random.randint(3, 127)
jmin = random.randint(3, 700)
jmax = random.randint(jmin+1, 1270)
out = g_defserver_config
out = out.replace('<SERVER_KEY_TIME>', datetime.datetime.now().isoformat())
out = out.replace('<SERVER_PRIVATE_KEY>', priv_key)
out = out.replace('<SERVER_PUBLIC_KEY>', pub_key)
out = out.replace('<SERVER_ADDR>', str(ipaddr))
out = out.replace('<SERVER_PORT>', str(opt.port))
if m_cfg_type == 'AWG':
out = out.replace('<JC>', str(jc))
out = out.replace('<JMIN>', str(jmin))
out = out.replace('<JMAX>', str(jmax))
out = out.replace('<S1>', str(random.randint(3, 127)))
out = out.replace('<S2>', str(random.randint(3, 127)))
out = out.replace('<H1>', str(random.randint(0x10000011, 0x7FFFFF00)))
out = out.replace('<H2>', str(random.randint(0x10000011, 0x7FFFFF00)))
out = out.replace('<H3>', str(random.randint(0x10000011, 0x7FFFFF00)))
out = out.replace('<H4>', str(random.randint(0x10000011, 0x7FFFFF00)))
else:
out = out.replace('\nJc = <' , '\n# ')
out = out.replace('\nJmin = <', '\n# ')
out = out.replace('\nJmax = <', '\n# ')
out = out.replace('\nS1 = <' , '\n# ')
out = out.replace('\nS2 = <' , '\n# ')
out = out.replace('\nH1 = <' , '\n# ')
out = out.replace('\nH2 = <' , '\n# ')
out = out.replace('\nH3 = <' , '\n# ')
out = out.replace('\nH4 = <' , '\n# ')
out = out.replace('<SERVER_IFACE>', main_iface)
out = out.replace('<SERVER_TUN>', tun_name)
with open(g_main_config_fn, 'w', newline = '\n') as file:
file.write(out)
print(f'{m_cfg_type} server config file "{g_main_config_fn}" created!')
with open(g_main_config_src, 'w', newline = '\n') as file:
file.write(g_main_config_fn)
sys.exit(0)
# -------------------------------------------------------------------------------------
get_main_config_path(check = True)
if opt.create:
if os.path.exists(opt.tmpcfg):
raise RuntimeError(f'ERROR: file "{opt.tmpcfg}" already exists!')
print(f'Create template for client configs: "{opt.tmpcfg}"...')
os.remove(opt.tmpcfg) if os.path.exists(opt.tmpcfg) else None
if opt.ipaddr:
ipaddr = opt.ipaddr
else:
ext_ipaddr = get_ext_ipaddr()
print(f'External IP-Addr: "{ext_ipaddr}"')
ipaddr = ext_ipaddr
ipaddr = IPAddr(ipaddr)
if ipaddr.mask:
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"')
print(f'Server IP-Addr: "{ipaddr}"')
out = g_defclient_config
out = out.replace('<SERVER_ADDR>', str(ipaddr))
if g_main_config_type != 'AWG':
out = out.replace('\nJc = <' , '\n# ')
out = out.replace('\nJmin = <', '\n# ')
out = out.replace('\nJmax = <', '\n# ')
out = out.replace('\nS1 = <' , '\n# ')
out = out.replace('\nS2 = <' , '\n# ')
out = out.replace('\nH1 = <' , '\n# ')
out = out.replace('\nH2 = <' , '\n# ')
out = out.replace('\nH3 = <' , '\n# ')
out = out.replace('\nH4 = <' , '\n# ')
with open(opt.tmpcfg, 'w', newline = '\n') as file:
file.write(out)
print(f'Template client config file "{opt.tmpcfg}" created!')
sys.exit(0)
# -------------------------------------------------------------------------------------
xopt = [ opt.addcl, opt.update, opt.delete ]
copt = [ x for x in xopt if len(x) > 0 ]
if copt and len(copt) >= 2:
raise RuntimeError(f'ERROR: Incorrect arguments! Too many actions!')
if opt.addcl:
cfg = WGConfig(g_main_config_fn)
srv = cfg.iface
c_name = opt.addcl
print(f'Add new client config "{c_name}"...')
max_addr = None
for peer_name, peer in cfg.peer.items():
if peer_name.lower() == c_name.lower():
raise RuntimeError(f'ERROR: peer with name "{c_name}" already exists!')
if opt.ipaddr:
addr = IPAddr(opt.ipaddr)
addr.mask = None
addr = str(addr)
p_addr = IPAddr(peer['AllowedIPs'])
p_addr.mask = None
p_addr = str(p_addr)
if addr == p_addr:
raise RuntimeError(f'ERROR: IP-addr "{opt.ipaddr}" already used!')
addr = IPAddr(peer['AllowedIPs'])
if not max_addr or addr.ip[3] > max_addr.ip[3]:
max_addr = addr
priv_key, pub_key = gen_pair_keys()
with open(g_main_config_fn, 'rb') as file:
srvcfg = file.read()
srvcfg = srvcfg.decode('utf8')
if opt.ipaddr:
ipaddr = opt.ipaddr
else:
if max_addr is None:
max_addr = IPAddr(srv['Address'])
max_addr.ip[3] += 1
max_addr.mask = 32
ipaddr = str(max_addr)
else:
max_addr.ip[3] += 1
ipaddr = str(max_addr)
if max_addr.ip[3] >= 254:
raise RuntimeError(f'ERROR: There are no more free IP-addresses')
srvcfg += f'\n'
srvcfg += f'[Peer]\n'
srvcfg += f'#_Name = {c_name}\n'
srvcfg += f'#_GenKeyTime = {datetime.datetime.now().isoformat()}\n'
srvcfg += f'#_PrivateKey = {priv_key}\n'
srvcfg += f'PublicKey = {pub_key}\n'
srvcfg += f'AllowedIPs = {ipaddr}\n'
with open(g_main_config_fn, 'w', newline = '\n') as file:
file.write(srvcfg)
print(f'New client "{c_name}" added! IP-Addr: "{ipaddr}"')
if opt.update:
cfg = WGConfig(g_main_config_fn)
p_name = opt.update
print(f'Update keys for client "{p_name}"...')
priv_key, pub_key = gen_pair_keys()
cfg.set_param(p_name, '_PrivateKey', priv_key, force = True, offset = 2)
cfg.set_param(p_name, 'PublicKey', pub_key)
gentime = datetime.datetime.now().isoformat()
cfg.set_param(p_name, '_GenKeyTime', gentime, force = True, offset = 2)
ipaddr = cfg.peer[p_name]['AllowedIPs']
cfg.save()
print(f'Keys for client "{p_name}" updated! IP-Addr: "{ipaddr}"')
if opt.delete:
cfg = WGConfig(g_main_config_fn)
p_name = opt.delete
print(f'Delete client "{p_name}"...')
ipaddr = cfg.del_client(p_name)
cfg.save()
print(f'Client "{p_name}" deleted! IP-Addr: "{ipaddr}"')
if opt.confgen:
cfg = WGConfig(g_main_config_fn)
srv = cfg.iface
print('Generate client configs...')
if not os.path.exists(opt.tmpcfg):
raise RuntimeError(f'ERROR: file "{opt.tmpcfg}" not found!')
with open(opt.tmpcfg, 'r') as file:
tmpcfg = file.read()
flst = glob.glob("*.conf")
for fn in flst:
if fn.endswith('awg0.conf'):
continue
if os.path.exists(fn):
os.remove(fn)
flst = glob.glob("*.png")
for fn in flst:
if os.path.exists(fn):
os.remove(fn)
random.seed()
for peer_name, peer in cfg.peer.items():
if 'Name' not in peer or 'PrivateKey' not in peer:
print(f'Skip peer with pubkey "{peer["PublicKey"]}"')
continue
jc = random.randint(3, 127)
jmin = random.randint(3, 700)
jmax = random.randint(jmin+1, 1270)
out = tmpcfg[:]
out = out.replace('<CLIENT_PRIVATE_KEY>', peer['PrivateKey'])
out = out.replace('<CLIENT_PUBLIC_KEY>', peer['PublicKey'])
out = out.replace('<CLIENT_TUNNEL_IP>', peer['AllowedIPs'])
out = out.replace('<JC>', str(jc))
out = out.replace('<JMIN>', str(jmin))
out = out.replace('<JMAX>', str(jmax))
out = out.replace('<S1>', srv['S1'])
out = out.replace('<S2>', srv['S2'])
out = out.replace('<H1>', srv['H1'])
out = out.replace('<H2>', srv['H2'])
out = out.replace('<H3>', srv['H3'])
out = out.replace('<H4>', srv['H4'])
out = out.replace('<SERVER_PORT>', srv['ListenPort'])
out = out.replace('<SERVER_PUBLIC_KEY>', srv['PublicKey'])
fn = f'{peer_name}.conf'
with open(fn, 'w', newline = '\n') as file:
file.write(out)
if opt.qrcode:
print('Generate QR codes...')
flst = glob.glob("*.png")
for fn in flst:
if os.path.exists(fn):
os.remove(fn)
flst = glob.glob("*.conf")
if not flst:
raise RuntimeError(f'ERROR: client configs not founded!')
import qrcode
for fn in flst:
if fn.endswith('awg0.conf'):
continue
with open(fn, 'rb') as file:
conf = file.read()
conf = conf.decode('utf8')
name = os.path.splitext(fn)[0]
img = qrcode.make(conf)
img.save(f'{name}.png')
print('===== OK =====')
@remittor
Copy link
Author

Пример использования:

Перед поднятием интерфейса awg0 создаём основной конфиг (сервер будет прослушивать UDP порт 49666):

python3 awgcfg.py --make /etc/amnezia/amneziawg/awg0.conf -i 10.9.9.1/24 -p 49666 

Примечание: Если вы используете не AWG, а обычный WG, то путь к конфигу должен быть таким: /etc/wireguard/wg0.conf

Создаём шаблон для клиентских конфигов:

python3 awgcfg.py --create

Добавляем в основной конфиг два клиента с именами "my_phone" и "my_router":

python3 awgcfg.py -a "my_phone"
python3 awgcfg.py -a "my_router"

Удаляем из основного конфига клиента с именем "my_phone":

python3 awgcfg.py -d "my_phone"

Перегенерируем пару ключей у клиента с именем "my_router":

python3 awgcfg.py -u "my_router"

Создаём в текущей директории полный список конфигов для всех клиентов, а так же QR-коды для каждого (png-файлы):

python3 awgcfg.py -c -q

Примечание: после выполнения команд, которые изменяют конфиг WG, нужно вручную перезапускать awg0 (или wg0) интерфейс (либо перезапускать сервис, который его поднимает).

Примечание: последнюю версию скрипта можно скачать такой командой:

wget -O awgcfg.py https://gist.githubusercontent.com/remittor/8c3d9ff293b2ba4b13c367cc1a69f9eb/raw/awgcfg.py

@sweet-jelly
Copy link

sweet-jelly commented Sep 22, 2024

Здравствуйте, хочу использовать ваш скрипт в своем проекте. Не моли бы вы лицензировать его под пермиссивной или копилефтной лицензией (MIT, BSD, GPL, да хоть AGPL)?

@EugeneTM
Copy link

Скажите, а PresharedKey не предусмотрен?

@remittor
Copy link
Author

Скажите, а PresharedKey не предусмотрен?

Не предусмотрел, т.к. не разобрался для чего он нужен.

@karavan
Copy link

karavan commented Nov 6, 2024

Строка 334:
rc, out = exec_cmd('curl -4 icanhazip.com')
Желательно заменить на:
rc, out = exec_cmd('curl -4 -s icanhazip.com')

Иначе функция ломается при запуске скрипта в контейнере.

@pashakrapivin
Copy link

pashakrapivin commented Dec 15, 2024

Привет!
Почему то конфиги не хотят работать на мобильном интернете, но корректно работают под wifi. Пробовал устанавливать на разные vps - результат одинаков
Подскажите, с чем это может быть связяно.
При этом если создаю конфиг для AWG на тех же vps через приложение AmneziaVPN - всё ок

@Vitaly-ZS
Copy link

подскажите, как создать шаблон конфигов, если eth0 не имеет IPv4 адреса, а VPS находится за NAT с публичным IPv4 и предоткрытими портами?

@karavan
Copy link

karavan commented Dec 16, 2024

@Vitaly-ZS
Если я правильно проблему, то вам поможет запуск такой команды:
curl -s ipinfo.io/ip
или такой:
curl -s ifconfig.me/ip

@Vitaly-ZS
Copy link

Привет, @karavan
спасибо, что откликнулся.
возможно я не достаточно подробно описал проблему
я иду по флоу и запускаю команду:

# python3 awgcfg.py --create
Create template for client configs: "_defclient.config"...
Traceback (most recent call last):
  File "/root/awg/awgcfg.py", line 490, in <module>
    ext_ipaddr = get_ext_ipaddr()
  File "/root/awg/awgcfg.py", line 334, in get_ext_ipaddr
    rc, out = exec_cmd('curl -4 -s icanhazip.com')
  File "/root/awg/awgcfg.py", line 314, in exec_cmd
    proc = subprocess.run(cmd, input = input, shell = shell, check = check,
  File "/usr/lib/python3.10/subprocess.py", line 526, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command 'curl -4 -s icanhazip.com' returned non-zero exit status 7.

полагаю, что проблема в том, что в моей конфигурации VPS нет выделенного IPv4, как видно из:

# ip a
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1400 qdisc fq_codel state UP group default qlen 1000
    link/ether yyyyyyy brd ff:ff:ff:ff:ff:ff
    altname enp0s3
    inet6 xxxxxx/64 scope global
       valid_lft forever preferred_lft forever
    inet6 zzzzzzzz/64 scope link
       valid_lft forever preferred_lft forever

но поставщик VPS предоставил мне публичный NAT IPv4 и десяток портов, которые переадресовывают на мой VPS.

Вопрос, как мне создать _defclient.config, если скрипт не может получить адреса? Как вручную указать нужный IP?

@karavan
Copy link

karavan commented Dec 16, 2024

@Vitaly-ZS
26-я строчка скрипта указывает параметр, через который можно передать IP сервера

@romankurbatov
Copy link

romankurbatov commented Dec 24, 2024

@pashakrapivin Здравствуйте! У меня была такая же проблема на МТС. Попробуйте уменьшить значения Jc/Jmin/Jmax и на сервере, и на клиенте. Например, взять Jc=4, Jmin=10, Jmax=50. Скорее всего, что-то близкое задано и в AmneziaVPN, там можно посмотреть значения параметров. Вот ещё здесь ту же проблему обсуждают: amnezia-vpn/amnezia-client#1010

@pashakrapivin
Copy link

@pashakrapivin Здравствуйте! У меня была такая же проблема на МТС. Попробуйте уменьшить значения Jc/Jmin/Jmax и на сервере, и на клиенте. Например, взять Jc=4, Jmin=10, Jmax=50. Скорее всего, что-то близкое задано и в AmneziaVPN, там можно посмотреть значения параметров. Вот ещё здесь ту же проблему обсуждают: amnezia-vpn/amnezia-client#1010

Спасибо большое, это сработало. Подправил на

random.seed()
    jc = random.randint(3, 10)
    jmin = random.randint(3, 20)
    jmax = random.randint(jmin+1, 50)

AmneziaVPN ставит значения Jc = 3, Jmin = 10, Jmax = 50

@Zerogoki00
Copy link

Вы в скрипте во-первых забыли IPv6 в AllowedIPs поставить, будет его утечка
Во-вторых достаточно 0.0.0.0/0 и ::/0 поставить и всё равно будет правильно работать

@VitalyArtemiev
Copy link

А зачем 8.8.8.8/32 в allowed-ips? Вы собираетесь клиент разворачивать на гугловских серверах?

@scorpeus
Copy link

scorpeus commented Feb 6, 2025

Все работает, только к автору несколько вопросов?

  1. Почему в клиентском AllowedIPs (строка 73) именно такие адреса? По какому принципу они отбирались. По дефолту же 0.0.0.0/0, ::0 должно быть, чтоб весь трафик залетал в туннель.
  2. Почему DNS 8.8.8.8 именно гугловский? Я понимаю что все эти значения можно изменять кому как нравится, но просто интересно почему автор выбрал именно этот DNS, может там своя философия?
  3. Jc Jmin Jmax S1 S2 H1 H2 H3 H4 Эти значения должны быть на сервере и на клиенте одинаковые? А если я раздам всем своим друзьям конфиги клиентские и у кого-то не будет работать как у парней выше провайдер МТС. Нужно будет изменять значения на сервере и снова раскидывать новые клиентские конфиги друзьям?

@EugeneTM
Copy link

EugeneTM commented Feb 6, 2025

Отвечу вместо автора:

  1. AllowedIPs выбраны так, чтобы исключить немаршрутизируемые диапазоны 10., 172..., 192.168., чтобы продолжала работать локалка. Исключить их не так просто, поэтому такой выбор, как и в оригинальном WG. По-моему, это глючит.
  2. Гугловский DNS оч хороший и распределён географически
  3. Одинаковые. Если изменить значения, нужно будет их менять и у клиентов.

@sekrett
Copy link

sekrett commented Feb 15, 2025

Подправил на

random.seed()
    jc = random.randint(3, 10)
    jmin = random.randint(3, 20)
    jmax = random.randint(jmin+1, 50)

Это нужно сделать в двух местах, код повторяется.

@EviLinArm
Copy link

Можно заменить код, чтобы "клиенты" занимали все свободные ip адреса в конфиге, а не только по порядку.

Этот кусок

    if opt.ipaddr:
        ipaddr = opt.ipaddr
    else:
        if max_addr is None:
            max_addr = IPAddr(srv['Address'])
            max_addr.ip[3] += 1
            max_addr.mask = 32
            ipaddr = str(max_addr)
        else:
            max_addr.ip[3] += 1
            ipaddr = str(max_addr)
        if max_addr.ip[3] >= 254:
            raise RuntimeError(f'ERROR: There are no more free IP-addresses')

На вот этот

    if opt.ipaddr:
        ipaddr = opt.ipaddr
    else:
        server_ip = IPAddr(srv['Address'])
        start_octet = server_ip.ip[3] + 1
        if start_octet >= 254:
            raise RuntimeError("ERROR: Нет свободных IP-адресов для клиентов (начальный адрес слишком высокий)")

        used_octets = set()
        for peer_name, peer in cfg.peer.items():
            peer_ip = IPAddr(peer['AllowedIPs'])
            used_octets.add(peer_ip.ip[3])

        free_octet = None
        for octet in range(start_octet, 254):
            if octet not in used_octets:
                free_octet = octet
                break

        if free_octet is None:
            raise RuntimeError("ERROR: Нет свободных IP-адресов для клиентов")

        new_ip = f"{server_ip.ip[0]}.{server_ip.ip[1]}.{server_ip.ip[2]}.{free_octet}"
        ipaddr = new_ip + '/32'

@gitfuntik
Copy link

gitfuntik commented Mar 4, 2025

По подсказкам нейросети.
Можно "закольцевать" выдачу IP.
Вместо +1 к уже выданным IP (прописанным в awg0.conf) и ошибке при достижении финального октета, скрипт будет искать ближайший свободный от первого IP и только если вообще все IP заняты, выдаст ошибку.

1. Импортируйте модуль ipaddress
import ipaddress

2. Добавьте функцию поиска свободного IP

def find_free_ip(used_ips, base_network):
    """
    Ищет первый свободный IP-адрес в подсети, заданной base_network,
    исключая уже использованные адреса из множества used_ips.
    """
    net = ipaddress.ip_network(base_network, strict=False)
    for ip in net.hosts():
        if str(ip) not in used_ips:
            return str(ip)
    raise RuntimeError("ERROR: Нет свободных IP-адресов в сети")

3. Измените функцию add_client

def add_client(args):
    cfg = WGConfig(get_main_config_path())
    srv = cfg.iface
    c_name = args.addcl
    print(f'Add new client config "{c_name}"...')

    # Проверяем, что имя клиента уникально
    for peer_name in cfg.peer.keys():
        if peer_name.lower() == c_name.lower():
            raise RuntimeError(f'ERROR: peer with name "{c_name}" already exists!')

    # Собираем уже использованные IP-адреса
    used_ips = set()
    for peer in cfg.peer.values():
        used_ips.add(peer['AllowedIPs'].split('/')[0])  # убираем маску, если есть

    # Если IP для клиента задан вручную, проверяем его
    if args.ipaddr:
        ipaddr = args.ipaddr
        if ipaddr in used_ips:
            raise RuntimeError(f'ERROR: IP-addr "{ipaddr}" already used!')
    else:
        # Берём базовую подсеть из серверного конфига
        base_network = srv['Address']  # например, "10.9.9.1/22"
        ipaddr = find_free_ip(used_ips, base_network)

    priv_key, pub_key = gen_pair_keys()
    with open(get_main_config_path(), 'rb') as file:
        srvcfg = file.read().decode('utf8')

    # Формируем новую запись для клиента
    srvcfg += f'\n[Peer]\n'
    srvcfg += f'#_Name = {c_name}\n'
    srvcfg += f'#_GenKeyTime = {datetime.datetime.now().isoformat()}\n'
    srvcfg += f'#_PrivateKey = {priv_key}\n'
    srvcfg += f'PublicKey = {pub_key}\n'
    srvcfg += f'AllowedIPs = {ipaddr}/32\n'

    with open(get_main_config_path(), 'w', newline='\n') as file:
        file.write(srvcfg)

    print(f'New client "{c_name}" added! IP-Addr: "{ipaddr}"')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment