Skip to content

Instantly share code, notes, and snippets.

@remittor
Last active March 4, 2025 08:07
Show Gist options
  • Save remittor/8c3d9ff293b2ba4b13c367cc1a69f9eb to your computer and use it in GitHub Desktop.
Save remittor/8c3d9ff293b2ba4b13c367cc1a69f9eb to your computer and use it in GitHub Desktop.
# SPDX-License-Identifier: MIT
# Author: remittor <[email protected]>
# Created: 2024
import os
import sys
import glob
import subprocess
import optparse
import random
import datetime
g_main_config_src = '.main.config'
g_main_config_fn = None
g_main_config_type = None
g_defclient_config_fn = "_defclient.config"
parser = optparse.OptionParser("usage: %prog [options]")
parser.add_option("-t", "--tmpcfg", dest="tmpcfg", default = g_defclient_config_fn)
parser.add_option("-c", "--conf", dest="confgen", action="store_true", default = False)
parser.add_option("-q", "--qrcode", dest="qrcode", action="store_true", default = False)
parser.add_option("-a", "--add", dest="addcl", default = "")
parser.add_option("-u", "--update", dest="update", default = "")
parser.add_option("-d", "--delete", dest="delete", default = "")
parser.add_option("-i", "--ipaddr", dest="ipaddr", default = "")
parser.add_option("-p", "--port", dest="port", default = None, type = 'int')
parser.add_option("", "--make", dest="makecfg", default = "")
parser.add_option("", "--tun", dest="tun", default = "")
parser.add_option("", "--create", dest="create", action="store_true", default = False)
(opt, args) = parser.parse_args()
g_defserver_config = """
[Interface]
#_GenKeyTime = <SERVER_KEY_TIME>
PrivateKey = <SERVER_PRIVATE_KEY>
#_PublicKey = <SERVER_PUBLIC_KEY>
Address = <SERVER_ADDR>
ListenPort = <SERVER_PORT>
Jc = <JC>
Jmin = <JMIN>
Jmax = <JMAX>
S1 = <S1>
S2 = <S2>
H1 = <H1>
H2 = <H2>
H3 = <H3>
H4 = <H4>
PostUp = iptables -A INPUT -p udp --dport <SERVER_PORT> -m conntrack --ctstate NEW -j ACCEPT --wait 10 --wait-interval 50; iptables -A FORWARD -i <SERVER_IFACE> -o <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -A FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -t nat -A POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50; ip6tables -A FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; ip6tables -t nat -A POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50
PostDown = iptables -D INPUT -p udp --dport <SERVER_PORT> -m conntrack --ctstate NEW -j ACCEPT --wait 10 --wait-interval 50; iptables -D FORWARD -i <SERVER_IFACE> -o <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -D FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -t nat -D POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50; ip6tables -D FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; ip6tables -t nat -D POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50
"""
g_defclient_config = """
[Interface]
PrivateKey = <CLIENT_PRIVATE_KEY>
#_PublicKey = <CLIENT_PUBLIC_KEY>
Address = <CLIENT_TUNNEL_IP>
DNS = 8.8.8.8
Jc = <JC>
Jmin = <JMIN>
Jmax = <JMAX>
S1 = <S1>
S2 = <S2>
H1 = <H1>
H2 = <H2>
H3 = <H3>
H4 = <H4>
[Peer]
AllowedIPs = 0.0.0.0/5, 8.0.0.0/7, 11.0.0.0/8, 12.0.0.0/6, 16.0.0.0/4, 32.0.0.0/3, 64.0.0.0/2, 128.0.0.0/3, 160.0.0.0/5, 168.0.0.0/6, 172.0.0.0/12, 172.32.0.0/11, 172.64.0.0/10, 172.128.0.0/9, 173.0.0.0/8, 174.0.0.0/7, 176.0.0.0/4, 192.0.0.0/9, 192.128.0.0/11, 192.160.0.0/13, 192.169.0.0/16, 192.170.0.0/15, 192.172.0.0/14, 192.176.0.0/12, 192.192.0.0/10, 193.0.0.0/8, 194.0.0.0/7, 196.0.0.0/6, 200.0.0.0/5, 208.0.0.0/4, 8.8.8.8/32
Endpoint = <SERVER_ADDR>:<SERVER_PORT>
PersistentKeepalive = 60
PublicKey = <SERVER_PUBLIC_KEY>
"""
class IPAddr():
def __init__(self, ipaddr = None):
self.ip = [ 0, 0, 0, 0 ]
self.mask = None
if ipaddr:
self.init(ipaddr)
def init(self, ipaddr):
_ipaddr = ipaddr
if not ipaddr:
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"')
if ' ' in ipaddr:
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"')
if ',' in ipaddr:
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"')
self.ip = [ 0, 0, 0, 0 ]
self.mask = None
if '/' in ipaddr:
self.mask = int(ipaddr.split('/')[1])
ipaddr = ipaddr.split('/')[0]
nlist = ipaddr.split('.')
if len(nlist) != 4:
raise RuntimeError(f'ERROR: Incorrect IP-addr: "{_ipaddr}"')
for n, num in enumerate(nlist):
self.ip[n] = int(num)
def __str__(self):
out = f'{self.ip[0]}.{self.ip[1]}.{self.ip[2]}.{self.ip[3]}'
if self.mask:
out += '/' + str(self.mask)
return out
class WGConfig():
def __init__(self, filename = None):
self.lines = [ ]
self.iface = { }
self.peer = { }
self.idsline = { }
self.cfg_fn = None
if filename:
self.load(filename)
def load(self, filename):
self.cfg_fn = None
self.lines = [ ]
self.iface = { }
self.peer = { }
self.idsline = { }
with open(filename, 'r') as file:
lines = file.readlines()
iface = None
secdata = [ ]
secdata_item = None
secline = [ ]
secline_item = None
for n, line in enumerate(lines):
line = line.rstrip()
self.lines.append(line)
if line.strip() == '':
continue
if line.startswith(' ') and not line.strip().startswith('#'):
raise RuntimeError(f'ERROR_CFG: Incorrect line #{n} into config "{filename}"')
if line.startswith('#') and not line.startswith('#_'):
continue
if line.startswith('[') and line.endswith(']'):
section_name = line[1:-1]
if not section_name:
raise RuntimeError(f'ERROR_CFG: Incorrect section name: "{section_name}" (#{n+1})')
#print(secname)
secdata_item = { "_section_name": section_name.lower() }
secline_item = { "_section_name": n }
if section_name.lower() == 'interface':
if iface:
raise RuntimeError(f'ERROR_CFG: Found second section Interface in line #{n+1}')
iface = secdata_item
elif section_name.lower() == 'peer':
pass
else:
raise RuntimeError(f'ERROR_CFG: Found incorrect section "{section_name}" in line #{n+1}')
secdata.append(secdata_item)
secline.append(secline_item)
continue
if line.startswith('#_') and ' = ' in line:
line = line[2:]
if line.startswith('#'):
continue
if ' = ' not in line:
raise RuntimeError(f'ERROR_CFG: Incorrect line into config: "{line}" (#{n+1})')
xv = line.find(' = ')
if xv <= 0:
raise RuntimeError(f'ERROR_CFG: Incorrect line into config: "{line}" (#{n+1})')
vname = line[:xv].strip()
value = line[xv+3:].strip()
#print(f' "{vname}" = "{value}"')
if not secdata_item:
raise RuntimeError(f'ERROR_CFG: Parameter "{vname}" have unknown section! (#{n+1})')
section_name = secdata_item['_section_name']
if vname in secdata_item:
raise RuntimeError(f'ERROR_CFG: Found duplicate of param "{vname}" into section "{section_name}" (#{n+1})')
secdata_item[vname] = value
secline_item[vname] = n
if not iface:
raise RuntimeError(f'ERROR_CFG: Cannot found section Interface!')
for i, item in enumerate(secdata):
line = secline[i]
peer_name = ""
if item['_section_name'] == 'interface':
self.iface = item
peer_name = "__this_server__"
if 'PublicKey' not in item:
raise RuntimeError(f'ERROR_CFG: Cannot found PublicKey in Interface')
if 'PrivateKey' not in item:
raise RuntimeError(f'ERROR_CFG: Cannot found PrivateKey in Interface')
else:
if 'Name' in item:
peer_name = item['Name']
if not peer_name:
raise RuntimeError(f'ERROR_CFG: Invalid peer Name in line #{line["Name"]}')
elif 'PublicKey' in item:
peer_name = item['PublicKey']
if not peer_name:
raise RuntimeError(f'ERROR_CFG: Invalid peer PublicKey in line #{line["PublicKey"]}')
else:
raise RuntimeError(f'ERROR_CFG: Invalid peer data in line #{line["_section_name"]}')
if 'AllowedIPs' not in item:
raise RuntimeError(f'ERROR_CFG: Cannot found "AllowedIPs" into peer "{peer_name}"')
if peer_name in self.peer:
raise RuntimeError(f'ERROR_CFG: Found duplicate peer with name "{peer_name}"')
self.peer[peer_name] = item
if peer_name in self.idsline:
raise RuntimeError(f'ERROR_CFG: Found duplicate peer with name "{peer_name}"')
min_line = line['_section_name']
max_line = min_line
self.idsline[f'{peer_name}'] = min_line
for vname in item:
self.idsline[f'{peer_name}|{vname}'] = line[vname]
if line[vname] > max_line:
max_line = line[vname]
item['_lines_range'] = ( min_line, max_line )
self.cfg_fn = filename
return len(self.peer)
def save(self, filename = None):
if not filename:
filename = self.cfg_fn
if not self.lines:
raise RuntimeError(f'ERROR: no data')
with open(filename, 'w', newline = '\n') as file:
for line in self.lines:
file.write(line + '\n')
def del_client(self, c_name):
if c_name not in self.peer:
raise RuntimeError(f'ERROR: Not found client "{c_name}" in peer list!')
client = self.peer[c_name]
ipaddr = client['AllowedIPs']
min_line, max_line = client['_lines_range']
del self.lines[min_line:max_line+1]
del self.peer[c_name]
secsize = max_line - min_line + 1
del_list = [ ]
for k, v in self.idsline.items():
if v >= min_line and v <= max_line:
del_list.append(k)
elif v > max_line:
self.idsline[k] = v - secsize
for k in del_list:
del self.idsline[k]
return ipaddr
def set_param(self, c_name, param_name, param_value, force = False, offset = 0):
if c_name not in self.peer:
raise RuntimeError(f'ERROR: Not found client "{c_name}" in peer list!')
line_prefix = ""
if param_name.startswith('_'):
line_prefix = "#_"
param_name = param_name[1:]
client = self.peer[c_name]
min_line, max_line = client['_lines_range']
if param_name in client:
nline = self.idsline[f'{c_name}|{param_name}']
line = self.lines[nline]
if line.startswith('#_'):
line_prefix = "#_"
self.lines[nline] = f'{line_prefix}{param_name} = {param_value}'
return
if not force:
raise RuntimeError(f'ERROR: Param "{param_name}" not found for client "{c_name}"')
new_line = f'{line_prefix}{param_name} = {param_value}'
client[param_name] = param_value
secsize = max_line - min_line + 1
if offset >= secsize:
raise RuntimeError(f'ERROR: Incorrect offset value = {offset} (secsize = {secsize})')
pos = max_line + 1 if offset <= 0 else min_line + offset
for k, v in self.idsline.items():
if v >= pos:
self.idsline[k] = v + 1
self.idsline[f'{c_name}|{param_name}'] = pos
self.lines.insert(pos, new_line)
return
def exec_cmd(cmd, input = None, shell = True, check = True, timeout = None):
proc = subprocess.run(cmd, input = input, shell = shell, check = check,
timeout = timeout, encoding = 'utf8',
stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
rc = proc.returncode
out = proc.stdout
return rc, out
def get_main_iface():
rc, out = exec_cmd('ip link show')
if rc:
raise RuntimeError(f'ERROR: Cannot get net interfaces')
for line in out.split('\n'):
if '<BROADCAST' in line and 'state UP' in line:
xv = line.split(':')
return xv[1].strip()
return None
def get_ext_ipaddr():
rc, out = exec_cmd('curl -4 -s icanhazip.com')
if rc:
raise RuntimeError(f'ERROR: Cannot get ext IP-Addr')
lines = out.split('\n')
ipaddr = lines[-1] if lines[-1] else lines[-2]
ipaddr = IPAddr(ipaddr)
return str(ipaddr)
def gen_pair_keys(cfg_type = None):
global g_main_config_type
if sys.platform == 'win32':
return 'client_priv_key', 'client_pub_key'
if not cfg_type:
cfg_type = g_main_config_type
if not cfg_type:
raise
wgtool = cfg_type.lower()
rc, out = exec_cmd(f'{wgtool} genkey')
if rc:
raise RuntimeError(f'ERROR: Cannot generate private key')
priv_key = out.strip()
if not priv_key:
raise RuntimeError(f'ERROR: Cannot generate private Key')
rc, out = exec_cmd(f'{wgtool} pubkey', input = priv_key + '\n')
if rc:
raise RuntimeError(f'ERROR: Cannot generate public key')
pub_key = out.strip()
if not pub_key:
raise RuntimeError(f'ERROR: Cannot generate public Key')
return priv_key, pub_key
def get_main_config_path(check = True):
global g_main_config_fn
global g_main_config_type
if not os.path.exists(g_main_config_src):
raise RuntimeError(f'ERROR: file "{g_main_config_src}" not found!')
with open(g_main_config_src, 'r') as file:
lines = file.readlines()
g_main_config_fn = lines[0].strip()
cfg_exists = os.path.exists(g_main_config_fn)
g_main_config_type = 'WG'
if os.path.basename(g_main_config_fn).startswith('a'):
g_main_config_type = 'AWG'
if check:
if not cfg_exists:
raise RuntimeError(f'ERROR: Main {g_main_config_type} config file "{g_main_config_fn}" not found!')
return g_main_config_fn
# -------------------------------------------------------------------------------------
if opt.makecfg:
g_main_config_fn = opt.makecfg
if os.path.exists(g_main_config_fn):
raise RuntimeError(f'ERROR: file "{g_main_config_fn}" already exists!')
m_cfg_type = 'WG'
if os.path.basename(g_main_config_fn).startswith('a'):
m_cfg_type = 'AWG'
print(f'Make {m_cfg_type} server config: "{g_main_config_fn}"...')
main_iface = get_main_iface()
if not main_iface:
raise RuntimeError(f'ERROR: Cannot get main network interface!')
print(f'Main network iface: "{main_iface}"')
if opt.port <= 1000 or opt.port > 65530:
raise RuntimeError(f'ERROR: Incorrect argument port = {opt.port}')
if not opt.ipaddr:
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"')
ipaddr = IPAddr(opt.ipaddr)
if not ipaddr.mask:
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"')
if opt.tun:
tun_name = opt.tun
else:
cfg_name = os.path.basename(g_main_config_fn)
tun_name = os.path.splitext(cfg_name)[0].strip()
print(f'Tunnel iface: "{tun_name}"')
priv_key, pub_key = gen_pair_keys(m_cfg_type)
random.seed()
jc = random.randint(3, 127)
jmin = random.randint(3, 700)
jmax = random.randint(jmin+1, 1270)
out = g_defserver_config
out = out.replace('<SERVER_KEY_TIME>', datetime.datetime.now().isoformat())
out = out.replace('<SERVER_PRIVATE_KEY>', priv_key)
out = out.replace('<SERVER_PUBLIC_KEY>', pub_key)
out = out.replace('<SERVER_ADDR>', str(ipaddr))
out = out.replace('<SERVER_PORT>', str(opt.port))
if m_cfg_type == 'AWG':
out = out.replace('<JC>', str(jc))
out = out.replace('<JMIN>', str(jmin))
out = out.replace('<JMAX>', str(jmax))
out = out.replace('<S1>', str(random.randint(3, 127)))
out = out.replace('<S2>', str(random.randint(3, 127)))
out = out.replace('<H1>', str(random.randint(0x10000011, 0x7FFFFF00)))
out = out.replace('<H2>', str(random.randint(0x10000011, 0x7FFFFF00)))
out = out.replace('<H3>', str(random.randint(0x10000011, 0x7FFFFF00)))
out = out.replace('<H4>', str(random.randint(0x10000011, 0x7FFFFF00)))
else:
out = out.replace('\nJc = <' , '\n# ')
out = out.replace('\nJmin = <', '\n# ')
out = out.replace('\nJmax = <', '\n# ')
out = out.replace('\nS1 = <' , '\n# ')
out = out.replace('\nS2 = <' , '\n# ')
out = out.replace('\nH1 = <' , '\n# ')
out = out.replace('\nH2 = <' , '\n# ')
out = out.replace('\nH3 = <' , '\n# ')
out = out.replace('\nH4 = <' , '\n# ')
out = out.replace('<SERVER_IFACE>', main_iface)
out = out.replace('<SERVER_TUN>', tun_name)
with open(g_main_config_fn, 'w', newline = '\n') as file:
file.write(out)
print(f'{m_cfg_type} server config file "{g_main_config_fn}" created!')
with open(g_main_config_src, 'w', newline = '\n') as file:
file.write(g_main_config_fn)
sys.exit(0)
# -------------------------------------------------------------------------------------
get_main_config_path(check = True)
if opt.create:
if os.path.exists(opt.tmpcfg):
raise RuntimeError(f'ERROR: file "{opt.tmpcfg}" already exists!')
print(f'Create template for client configs: "{opt.tmpcfg}"...')
os.remove(opt.tmpcfg) if os.path.exists(opt.tmpcfg) else None
if opt.ipaddr:
ipaddr = opt.ipaddr
else:
ext_ipaddr = get_ext_ipaddr()
print(f'External IP-Addr: "{ext_ipaddr}"')
ipaddr = ext_ipaddr
ipaddr = IPAddr(ipaddr)
if ipaddr.mask:
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"')
print(f'Server IP-Addr: "{ipaddr}"')
out = g_defclient_config
out = out.replace('<SERVER_ADDR>', str(ipaddr))
if g_main_config_type != 'AWG':
out = out.replace('\nJc = <' , '\n# ')
out = out.replace('\nJmin = <', '\n# ')
out = out.replace('\nJmax = <', '\n# ')
out = out.replace('\nS1 = <' , '\n# ')
out = out.replace('\nS2 = <' , '\n# ')
out = out.replace('\nH1 = <' , '\n# ')
out = out.replace('\nH2 = <' , '\n# ')
out = out.replace('\nH3 = <' , '\n# ')
out = out.replace('\nH4 = <' , '\n# ')
with open(opt.tmpcfg, 'w', newline = '\n') as file:
file.write(out)
print(f'Template client config file "{opt.tmpcfg}" created!')
sys.exit(0)
# -------------------------------------------------------------------------------------
xopt = [ opt.addcl, opt.update, opt.delete ]
copt = [ x for x in xopt if len(x) > 0 ]
if copt and len(copt) >= 2:
raise RuntimeError(f'ERROR: Incorrect arguments! Too many actions!')
if opt.addcl:
cfg = WGConfig(g_main_config_fn)
srv = cfg.iface
c_name = opt.addcl
print(f'Add new client config "{c_name}"...')
max_addr = None
for peer_name, peer in cfg.peer.items():
if peer_name.lower() == c_name.lower():
raise RuntimeError(f'ERROR: peer with name "{c_name}" already exists!')
if opt.ipaddr:
addr = IPAddr(opt.ipaddr)
addr.mask = None
addr = str(addr)
p_addr = IPAddr(peer['AllowedIPs'])
p_addr.mask = None
p_addr = str(p_addr)
if addr == p_addr:
raise RuntimeError(f'ERROR: IP-addr "{opt.ipaddr}" already used!')
addr = IPAddr(peer['AllowedIPs'])
if not max_addr or addr.ip[3] > max_addr.ip[3]:
max_addr = addr
priv_key, pub_key = gen_pair_keys()
with open(g_main_config_fn, 'rb') as file:
srvcfg = file.read()
srvcfg = srvcfg.decode('utf8')
if opt.ipaddr:
ipaddr = opt.ipaddr
else:
if max_addr is None:
max_addr = IPAddr(srv['Address'])
max_addr.ip[3] += 1
max_addr.mask = 32
ipaddr = str(max_addr)
else:
max_addr.ip[3] += 1
ipaddr = str(max_addr)
if max_addr.ip[3] >= 254:
raise RuntimeError(f'ERROR: There are no more free IP-addresses')
srvcfg += f'\n'
srvcfg += f'[Peer]\n'
srvcfg += f'#_Name = {c_name}\n'
srvcfg += f'#_GenKeyTime = {datetime.datetime.now().isoformat()}\n'
srvcfg += f'#_PrivateKey = {priv_key}\n'
srvcfg += f'PublicKey = {pub_key}\n'
srvcfg += f'AllowedIPs = {ipaddr}\n'
with open(g_main_config_fn, 'w', newline = '\n') as file:
file.write(srvcfg)
print(f'New client "{c_name}" added! IP-Addr: "{ipaddr}"')
if opt.update:
cfg = WGConfig(g_main_config_fn)
p_name = opt.update
print(f'Update keys for client "{p_name}"...')
priv_key, pub_key = gen_pair_keys()
cfg.set_param(p_name, '_PrivateKey', priv_key, force = True, offset = 2)
cfg.set_param(p_name, 'PublicKey', pub_key)
gentime = datetime.datetime.now().isoformat()
cfg.set_param(p_name, '_GenKeyTime', gentime, force = True, offset = 2)
ipaddr = cfg.peer[p_name]['AllowedIPs']
cfg.save()
print(f'Keys for client "{p_name}" updated! IP-Addr: "{ipaddr}"')
if opt.delete:
cfg = WGConfig(g_main_config_fn)
p_name = opt.delete
print(f'Delete client "{p_name}"...')
ipaddr = cfg.del_client(p_name)
cfg.save()
print(f'Client "{p_name}" deleted! IP-Addr: "{ipaddr}"')
if opt.confgen:
cfg = WGConfig(g_main_config_fn)
srv = cfg.iface
print('Generate client configs...')
if not os.path.exists(opt.tmpcfg):
raise RuntimeError(f'ERROR: file "{opt.tmpcfg}" not found!')
with open(opt.tmpcfg, 'r') as file:
tmpcfg = file.read()
flst = glob.glob("*.conf")
for fn in flst:
if fn.endswith('awg0.conf'):
continue
if os.path.exists(fn):
os.remove(fn)
flst = glob.glob("*.png")
for fn in flst:
if os.path.exists(fn):
os.remove(fn)
random.seed()
for peer_name, peer in cfg.peer.items():
if 'Name' not in peer or 'PrivateKey' not in peer:
print(f'Skip peer with pubkey "{peer["PublicKey"]}"')
continue
jc = random.randint(3, 127)
jmin = random.randint(3, 700)
jmax = random.randint(jmin+1, 1270)
out = tmpcfg[:]
out = out.replace('<CLIENT_PRIVATE_KEY>', peer['PrivateKey'])
out = out.replace('<CLIENT_PUBLIC_KEY>', peer['PublicKey'])
out = out.replace('<CLIENT_TUNNEL_IP>', peer['AllowedIPs'])
out = out.replace('<JC>', str(jc))
out = out.replace('<JMIN>', str(jmin))
out = out.replace('<JMAX>', str(jmax))
out = out.replace('<S1>', srv['S1'])
out = out.replace('<S2>', srv['S2'])
out = out.replace('<H1>', srv['H1'])
out = out.replace('<H2>', srv['H2'])
out = out.replace('<H3>', srv['H3'])
out = out.replace('<H4>', srv['H4'])
out = out.replace('<SERVER_PORT>', srv['ListenPort'])
out = out.replace('<SERVER_PUBLIC_KEY>', srv['PublicKey'])
fn = f'{peer_name}.conf'
with open(fn, 'w', newline = '\n') as file:
file.write(out)
if opt.qrcode:
print('Generate QR codes...')
flst = glob.glob("*.png")
for fn in flst:
if os.path.exists(fn):
os.remove(fn)
flst = glob.glob("*.conf")
if not flst:
raise RuntimeError(f'ERROR: client configs not founded!')
import qrcode
for fn in flst:
if fn.endswith('awg0.conf'):
continue
with open(fn, 'rb') as file:
conf = file.read()
conf = conf.decode('utf8')
name = os.path.splitext(fn)[0]
img = qrcode.make(conf)
img.save(f'{name}.png')
print('===== OK =====')
@karavan
Copy link

karavan commented Dec 16, 2024

@Vitaly-ZS
26-я строчка скрипта указывает параметр, через который можно передать IP сервера

@romankurbatov
Copy link

romankurbatov commented Dec 24, 2024

@pashakrapivin Здравствуйте! У меня была такая же проблема на МТС. Попробуйте уменьшить значения Jc/Jmin/Jmax и на сервере, и на клиенте. Например, взять Jc=4, Jmin=10, Jmax=50. Скорее всего, что-то близкое задано и в AmneziaVPN, там можно посмотреть значения параметров. Вот ещё здесь ту же проблему обсуждают: amnezia-vpn/amnezia-client#1010

@pashakrapivin
Copy link

@pashakrapivin Здравствуйте! У меня была такая же проблема на МТС. Попробуйте уменьшить значения Jc/Jmin/Jmax и на сервере, и на клиенте. Например, взять Jc=4, Jmin=10, Jmax=50. Скорее всего, что-то близкое задано и в AmneziaVPN, там можно посмотреть значения параметров. Вот ещё здесь ту же проблему обсуждают: amnezia-vpn/amnezia-client#1010

Спасибо большое, это сработало. Подправил на

random.seed()
    jc = random.randint(3, 10)
    jmin = random.randint(3, 20)
    jmax = random.randint(jmin+1, 50)

AmneziaVPN ставит значения Jc = 3, Jmin = 10, Jmax = 50

@Zerogoki00
Copy link

Вы в скрипте во-первых забыли IPv6 в AllowedIPs поставить, будет его утечка
Во-вторых достаточно 0.0.0.0/0 и ::/0 поставить и всё равно будет правильно работать

@VitalyArtemiev
Copy link

А зачем 8.8.8.8/32 в allowed-ips? Вы собираетесь клиент разворачивать на гугловских серверах?

@scorpeus
Copy link

scorpeus commented Feb 6, 2025

Все работает, только к автору несколько вопросов?

  1. Почему в клиентском AllowedIPs (строка 73) именно такие адреса? По какому принципу они отбирались. По дефолту же 0.0.0.0/0, ::0 должно быть, чтоб весь трафик залетал в туннель.
  2. Почему DNS 8.8.8.8 именно гугловский? Я понимаю что все эти значения можно изменять кому как нравится, но просто интересно почему автор выбрал именно этот DNS, может там своя философия?
  3. Jc Jmin Jmax S1 S2 H1 H2 H3 H4 Эти значения должны быть на сервере и на клиенте одинаковые? А если я раздам всем своим друзьям конфиги клиентские и у кого-то не будет работать как у парней выше провайдер МТС. Нужно будет изменять значения на сервере и снова раскидывать новые клиентские конфиги друзьям?

@EugeneTM
Copy link

EugeneTM commented Feb 6, 2025

Отвечу вместо автора:

  1. AllowedIPs выбраны так, чтобы исключить немаршрутизируемые диапазоны 10., 172..., 192.168., чтобы продолжала работать локалка. Исключить их не так просто, поэтому такой выбор, как и в оригинальном WG. По-моему, это глючит.
  2. Гугловский DNS оч хороший и распределён географически
  3. Одинаковые. Если изменить значения, нужно будет их менять и у клиентов.

@sekrett
Copy link

sekrett commented Feb 15, 2025

Подправил на

random.seed()
    jc = random.randint(3, 10)
    jmin = random.randint(3, 20)
    jmax = random.randint(jmin+1, 50)

Это нужно сделать в двух местах, код повторяется.

@EviLinArm
Copy link

Можно заменить код, чтобы "клиенты" занимали все свободные ip адреса в конфиге, а не только по порядку.

Этот кусок

    if opt.ipaddr:
        ipaddr = opt.ipaddr
    else:
        if max_addr is None:
            max_addr = IPAddr(srv['Address'])
            max_addr.ip[3] += 1
            max_addr.mask = 32
            ipaddr = str(max_addr)
        else:
            max_addr.ip[3] += 1
            ipaddr = str(max_addr)
        if max_addr.ip[3] >= 254:
            raise RuntimeError(f'ERROR: There are no more free IP-addresses')

На вот этот

    if opt.ipaddr:
        ipaddr = opt.ipaddr
    else:
        server_ip = IPAddr(srv['Address'])
        start_octet = server_ip.ip[3] + 1
        if start_octet >= 254:
            raise RuntimeError("ERROR: Нет свободных IP-адресов для клиентов (начальный адрес слишком высокий)")

        used_octets = set()
        for peer_name, peer in cfg.peer.items():
            peer_ip = IPAddr(peer['AllowedIPs'])
            used_octets.add(peer_ip.ip[3])

        free_octet = None
        for octet in range(start_octet, 254):
            if octet not in used_octets:
                free_octet = octet
                break

        if free_octet is None:
            raise RuntimeError("ERROR: Нет свободных IP-адресов для клиентов")

        new_ip = f"{server_ip.ip[0]}.{server_ip.ip[1]}.{server_ip.ip[2]}.{free_octet}"
        ipaddr = new_ip + '/32'

@gitfuntik
Copy link

gitfuntik commented Mar 4, 2025

По подсказкам нейросети.
Можно "закольцевать" выдачу IP.
Вместо +1 к уже выданным IP (прописанным в awg0.conf) и ошибке при достижении финального октета, скрипт будет искать ближайший свободный от первого IP и только если вообще все IP заняты, выдаст ошибку.

1. Импортируйте модуль ipaddress
import ipaddress

2. Добавьте функцию поиска свободного IP

def find_free_ip(used_ips, base_network):
    """
    Ищет первый свободный IP-адрес в подсети, заданной base_network,
    исключая уже использованные адреса из множества used_ips.
    """
    net = ipaddress.ip_network(base_network, strict=False)
    for ip in net.hosts():
        if str(ip) not in used_ips:
            return str(ip)
    raise RuntimeError("ERROR: Нет свободных IP-адресов в сети")

3. Измените функцию add_client

def add_client(args):
    cfg = WGConfig(get_main_config_path())
    srv = cfg.iface
    c_name = args.addcl
    print(f'Add new client config "{c_name}"...')

    # Проверяем, что имя клиента уникально
    for peer_name in cfg.peer.keys():
        if peer_name.lower() == c_name.lower():
            raise RuntimeError(f'ERROR: peer with name "{c_name}" already exists!')

    # Собираем уже использованные IP-адреса
    used_ips = set()
    for peer in cfg.peer.values():
        used_ips.add(peer['AllowedIPs'].split('/')[0])  # убираем маску, если есть

    # Если IP для клиента задан вручную, проверяем его
    if args.ipaddr:
        ipaddr = args.ipaddr
        if ipaddr in used_ips:
            raise RuntimeError(f'ERROR: IP-addr "{ipaddr}" already used!')
    else:
        # Берём базовую подсеть из серверного конфига
        base_network = srv['Address']  # например, "10.9.9.1/22"
        ipaddr = find_free_ip(used_ips, base_network)

    priv_key, pub_key = gen_pair_keys()
    with open(get_main_config_path(), 'rb') as file:
        srvcfg = file.read().decode('utf8')

    # Формируем новую запись для клиента
    srvcfg += f'\n[Peer]\n'
    srvcfg += f'#_Name = {c_name}\n'
    srvcfg += f'#_GenKeyTime = {datetime.datetime.now().isoformat()}\n'
    srvcfg += f'#_PrivateKey = {priv_key}\n'
    srvcfg += f'PublicKey = {pub_key}\n'
    srvcfg += f'AllowedIPs = {ipaddr}/32\n'

    with open(get_main_config_path(), 'w', newline='\n') as file:
        file.write(srvcfg)

    print(f'New client "{c_name}" added! IP-Addr: "{ipaddr}"')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment