Skip to content

Instantly share code, notes, and snippets.

@remittor
Last active March 4, 2025 08:07
Show Gist options
  • Save remittor/8c3d9ff293b2ba4b13c367cc1a69f9eb to your computer and use it in GitHub Desktop.
Save remittor/8c3d9ff293b2ba4b13c367cc1a69f9eb to your computer and use it in GitHub Desktop.
# SPDX-License-Identifier: MIT
# Author: remittor <[email protected]>
# Created: 2024
import os
import sys
import glob
import subprocess
import optparse
import random
import datetime
g_main_config_src = '.main.config'
g_main_config_fn = None
g_main_config_type = None
g_defclient_config_fn = "_defclient.config"
parser = optparse.OptionParser("usage: %prog [options]")
parser.add_option("-t", "--tmpcfg", dest="tmpcfg", default = g_defclient_config_fn)
parser.add_option("-c", "--conf", dest="confgen", action="store_true", default = False)
parser.add_option("-q", "--qrcode", dest="qrcode", action="store_true", default = False)
parser.add_option("-a", "--add", dest="addcl", default = "")
parser.add_option("-u", "--update", dest="update", default = "")
parser.add_option("-d", "--delete", dest="delete", default = "")
parser.add_option("-i", "--ipaddr", dest="ipaddr", default = "")
parser.add_option("-p", "--port", dest="port", default = None, type = 'int')
parser.add_option("", "--make", dest="makecfg", default = "")
parser.add_option("", "--tun", dest="tun", default = "")
parser.add_option("", "--create", dest="create", action="store_true", default = False)
(opt, args) = parser.parse_args()
g_defserver_config = """
[Interface]
#_GenKeyTime = <SERVER_KEY_TIME>
PrivateKey = <SERVER_PRIVATE_KEY>
#_PublicKey = <SERVER_PUBLIC_KEY>
Address = <SERVER_ADDR>
ListenPort = <SERVER_PORT>
Jc = <JC>
Jmin = <JMIN>
Jmax = <JMAX>
S1 = <S1>
S2 = <S2>
H1 = <H1>
H2 = <H2>
H3 = <H3>
H4 = <H4>
PostUp = iptables -A INPUT -p udp --dport <SERVER_PORT> -m conntrack --ctstate NEW -j ACCEPT --wait 10 --wait-interval 50; iptables -A FORWARD -i <SERVER_IFACE> -o <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -A FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -t nat -A POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50; ip6tables -A FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; ip6tables -t nat -A POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50
PostDown = iptables -D INPUT -p udp --dport <SERVER_PORT> -m conntrack --ctstate NEW -j ACCEPT --wait 10 --wait-interval 50; iptables -D FORWARD -i <SERVER_IFACE> -o <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -D FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -t nat -D POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50; ip6tables -D FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; ip6tables -t nat -D POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50
"""
g_defclient_config = """
[Interface]
PrivateKey = <CLIENT_PRIVATE_KEY>
#_PublicKey = <CLIENT_PUBLIC_KEY>
Address = <CLIENT_TUNNEL_IP>
DNS = 8.8.8.8
Jc = <JC>
Jmin = <JMIN>
Jmax = <JMAX>
S1 = <S1>
S2 = <S2>
H1 = <H1>
H2 = <H2>
H3 = <H3>
H4 = <H4>
[Peer]
AllowedIPs = 0.0.0.0/5, 8.0.0.0/7, 11.0.0.0/8, 12.0.0.0/6, 16.0.0.0/4, 32.0.0.0/3, 64.0.0.0/2, 128.0.0.0/3, 160.0.0.0/5, 168.0.0.0/6, 172.0.0.0/12, 172.32.0.0/11, 172.64.0.0/10, 172.128.0.0/9, 173.0.0.0/8, 174.0.0.0/7, 176.0.0.0/4, 192.0.0.0/9, 192.128.0.0/11, 192.160.0.0/13, 192.169.0.0/16, 192.170.0.0/15, 192.172.0.0/14, 192.176.0.0/12, 192.192.0.0/10, 193.0.0.0/8, 194.0.0.0/7, 196.0.0.0/6, 200.0.0.0/5, 208.0.0.0/4, 8.8.8.8/32
Endpoint = <SERVER_ADDR>:<SERVER_PORT>
PersistentKeepalive = 60
PublicKey = <SERVER_PUBLIC_KEY>
"""
class IPAddr():
def __init__(self, ipaddr = None):
self.ip = [ 0, 0, 0, 0 ]
self.mask = None
if ipaddr:
self.init(ipaddr)
def init(self, ipaddr):
_ipaddr = ipaddr
if not ipaddr:
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"')
if ' ' in ipaddr:
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"')
if ',' in ipaddr:
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"')
self.ip = [ 0, 0, 0, 0 ]
self.mask = None
if '/' in ipaddr:
self.mask = int(ipaddr.split('/')[1])
ipaddr = ipaddr.split('/')[0]
nlist = ipaddr.split('.')
if len(nlist) != 4:
raise RuntimeError(f'ERROR: Incorrect IP-addr: "{_ipaddr}"')
for n, num in enumerate(nlist):
self.ip[n] = int(num)
def __str__(self):
out = f'{self.ip[0]}.{self.ip[1]}.{self.ip[2]}.{self.ip[3]}'
if self.mask:
out += '/' + str(self.mask)
return out
class WGConfig():
def __init__(self, filename = None):
self.lines = [ ]
self.iface = { }
self.peer = { }
self.idsline = { }
self.cfg_fn = None
if filename:
self.load(filename)
def load(self, filename):
self.cfg_fn = None
self.lines = [ ]
self.iface = { }
self.peer = { }
self.idsline = { }
with open(filename, 'r') as file:
lines = file.readlines()
iface = None
secdata = [ ]
secdata_item = None
secline = [ ]
secline_item = None
for n, line in enumerate(lines):
line = line.rstrip()
self.lines.append(line)
if line.strip() == '':
continue
if line.startswith(' ') and not line.strip().startswith('#'):
raise RuntimeError(f'ERROR_CFG: Incorrect line #{n} into config "{filename}"')
if line.startswith('#') and not line.startswith('#_'):
continue
if line.startswith('[') and line.endswith(']'):
section_name = line[1:-1]
if not section_name:
raise RuntimeError(f'ERROR_CFG: Incorrect section name: "{section_name}" (#{n+1})')
#print(secname)
secdata_item = { "_section_name": section_name.lower() }
secline_item = { "_section_name": n }
if section_name.lower() == 'interface':
if iface:
raise RuntimeError(f'ERROR_CFG: Found second section Interface in line #{n+1}')
iface = secdata_item
elif section_name.lower() == 'peer':
pass
else:
raise RuntimeError(f'ERROR_CFG: Found incorrect section "{section_name}" in line #{n+1}')
secdata.append(secdata_item)
secline.append(secline_item)
continue
if line.startswith('#_') and ' = ' in line:
line = line[2:]
if line.startswith('#'):
continue
if ' = ' not in line:
raise RuntimeError(f'ERROR_CFG: Incorrect line into config: "{line}" (#{n+1})')
xv = line.find(' = ')
if xv <= 0:
raise RuntimeError(f'ERROR_CFG: Incorrect line into config: "{line}" (#{n+1})')
vname = line[:xv].strip()
value = line[xv+3:].strip()
#print(f' "{vname}" = "{value}"')
if not secdata_item:
raise RuntimeError(f'ERROR_CFG: Parameter "{vname}" have unknown section! (#{n+1})')
section_name = secdata_item['_section_name']
if vname in secdata_item:
raise RuntimeError(f'ERROR_CFG: Found duplicate of param "{vname}" into section "{section_name}" (#{n+1})')
secdata_item[vname] = value
secline_item[vname] = n
if not iface:
raise RuntimeError(f'ERROR_CFG: Cannot found section Interface!')
for i, item in enumerate(secdata):
line = secline[i]
peer_name = ""
if item['_section_name'] == 'interface':
self.iface = item
peer_name = "__this_server__"
if 'PublicKey' not in item:
raise RuntimeError(f'ERROR_CFG: Cannot found PublicKey in Interface')
if 'PrivateKey' not in item:
raise RuntimeError(f'ERROR_CFG: Cannot found PrivateKey in Interface')
else:
if 'Name' in item:
peer_name = item['Name']
if not peer_name:
raise RuntimeError(f'ERROR_CFG: Invalid peer Name in line #{line["Name"]}')
elif 'PublicKey' in item:
peer_name = item['PublicKey']
if not peer_name:
raise RuntimeError(f'ERROR_CFG: Invalid peer PublicKey in line #{line["PublicKey"]}')
else:
raise RuntimeError(f'ERROR_CFG: Invalid peer data in line #{line["_section_name"]}')
if 'AllowedIPs' not in item:
raise RuntimeError(f'ERROR_CFG: Cannot found "AllowedIPs" into peer "{peer_name}"')
if peer_name in self.peer:
raise RuntimeError(f'ERROR_CFG: Found duplicate peer with name "{peer_name}"')
self.peer[peer_name] = item
if peer_name in self.idsline:
raise RuntimeError(f'ERROR_CFG: Found duplicate peer with name "{peer_name}"')
min_line = line['_section_name']
max_line = min_line
self.idsline[f'{peer_name}'] = min_line
for vname in item:
self.idsline[f'{peer_name}|{vname}'] = line[vname]
if line[vname] > max_line:
max_line = line[vname]
item['_lines_range'] = ( min_line, max_line )
self.cfg_fn = filename
return len(self.peer)
def save(self, filename = None):
if not filename:
filename = self.cfg_fn
if not self.lines:
raise RuntimeError(f'ERROR: no data')
with open(filename, 'w', newline = '\n') as file:
for line in self.lines:
file.write(line + '\n')
def del_client(self, c_name):
if c_name not in self.peer:
raise RuntimeError(f'ERROR: Not found client "{c_name}" in peer list!')
client = self.peer[c_name]
ipaddr = client['AllowedIPs']
min_line, max_line = client['_lines_range']
del self.lines[min_line:max_line+1]
del self.peer[c_name]
secsize = max_line - min_line + 1
del_list = [ ]
for k, v in self.idsline.items():
if v >= min_line and v <= max_line:
del_list.append(k)
elif v > max_line:
self.idsline[k] = v - secsize
for k in del_list:
del self.idsline[k]
return ipaddr
def set_param(self, c_name, param_name, param_value, force = False, offset = 0):
if c_name not in self.peer:
raise RuntimeError(f'ERROR: Not found client "{c_name}" in peer list!')
line_prefix = ""
if param_name.startswith('_'):
line_prefix = "#_"
param_name = param_name[1:]
client = self.peer[c_name]
min_line, max_line = client['_lines_range']
if param_name in client:
nline = self.idsline[f'{c_name}|{param_name}']
line = self.lines[nline]
if line.startswith('#_'):
line_prefix = "#_"
self.lines[nline] = f'{line_prefix}{param_name} = {param_value}'
return
if not force:
raise RuntimeError(f'ERROR: Param "{param_name}" not found for client "{c_name}"')
new_line = f'{line_prefix}{param_name} = {param_value}'
client[param_name] = param_value
secsize = max_line - min_line + 1
if offset >= secsize:
raise RuntimeError(f'ERROR: Incorrect offset value = {offset} (secsize = {secsize})')
pos = max_line + 1 if offset <= 0 else min_line + offset
for k, v in self.idsline.items():
if v >= pos:
self.idsline[k] = v + 1
self.idsline[f'{c_name}|{param_name}'] = pos
self.lines.insert(pos, new_line)
return
def exec_cmd(cmd, input = None, shell = True, check = True, timeout = None):
proc = subprocess.run(cmd, input = input, shell = shell, check = check,
timeout = timeout, encoding = 'utf8',
stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
rc = proc.returncode
out = proc.stdout
return rc, out
def get_main_iface():
rc, out = exec_cmd('ip link show')
if rc:
raise RuntimeError(f'ERROR: Cannot get net interfaces')
for line in out.split('\n'):
if '<BROADCAST' in line and 'state UP' in line:
xv = line.split(':')
return xv[1].strip()
return None
def get_ext_ipaddr():
rc, out = exec_cmd('curl -4 -s icanhazip.com')
if rc:
raise RuntimeError(f'ERROR: Cannot get ext IP-Addr')
lines = out.split('\n')
ipaddr = lines[-1] if lines[-1] else lines[-2]
ipaddr = IPAddr(ipaddr)
return str(ipaddr)
def gen_pair_keys(cfg_type = None):
global g_main_config_type
if sys.platform == 'win32':
return 'client_priv_key', 'client_pub_key'
if not cfg_type:
cfg_type = g_main_config_type
if not cfg_type:
raise
wgtool = cfg_type.lower()
rc, out = exec_cmd(f'{wgtool} genkey')
if rc:
raise RuntimeError(f'ERROR: Cannot generate private key')
priv_key = out.strip()
if not priv_key:
raise RuntimeError(f'ERROR: Cannot generate private Key')
rc, out = exec_cmd(f'{wgtool} pubkey', input = priv_key + '\n')
if rc:
raise RuntimeError(f'ERROR: Cannot generate public key')
pub_key = out.strip()
if not pub_key:
raise RuntimeError(f'ERROR: Cannot generate public Key')
return priv_key, pub_key
def get_main_config_path(check = True):
global g_main_config_fn
global g_main_config_type
if not os.path.exists(g_main_config_src):
raise RuntimeError(f'ERROR: file "{g_main_config_src}" not found!')
with open(g_main_config_src, 'r') as file:
lines = file.readlines()
g_main_config_fn = lines[0].strip()
cfg_exists = os.path.exists(g_main_config_fn)
g_main_config_type = 'WG'
if os.path.basename(g_main_config_fn).startswith('a'):
g_main_config_type = 'AWG'
if check:
if not cfg_exists:
raise RuntimeError(f'ERROR: Main {g_main_config_type} config file "{g_main_config_fn}" not found!')
return g_main_config_fn
# -------------------------------------------------------------------------------------
if opt.makecfg:
g_main_config_fn = opt.makecfg
if os.path.exists(g_main_config_fn):
raise RuntimeError(f'ERROR: file "{g_main_config_fn}" already exists!')
m_cfg_type = 'WG'
if os.path.basename(g_main_config_fn).startswith('a'):
m_cfg_type = 'AWG'
print(f'Make {m_cfg_type} server config: "{g_main_config_fn}"...')
main_iface = get_main_iface()
if not main_iface:
raise RuntimeError(f'ERROR: Cannot get main network interface!')
print(f'Main network iface: "{main_iface}"')
if opt.port <= 1000 or opt.port > 65530:
raise RuntimeError(f'ERROR: Incorrect argument port = {opt.port}')
if not opt.ipaddr:
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"')
ipaddr = IPAddr(opt.ipaddr)
if not ipaddr.mask:
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"')
if opt.tun:
tun_name = opt.tun
else:
cfg_name = os.path.basename(g_main_config_fn)
tun_name = os.path.splitext(cfg_name)[0].strip()
print(f'Tunnel iface: "{tun_name}"')
priv_key, pub_key = gen_pair_keys(m_cfg_type)
random.seed()
jc = random.randint(3, 127)
jmin = random.randint(3, 700)
jmax = random.randint(jmin+1, 1270)
out = g_defserver_config
out = out.replace('<SERVER_KEY_TIME>', datetime.datetime.now().isoformat())
out = out.replace('<SERVER_PRIVATE_KEY>', priv_key)
out = out.replace('<SERVER_PUBLIC_KEY>', pub_key)
out = out.replace('<SERVER_ADDR>', str(ipaddr))
out = out.replace('<SERVER_PORT>', str(opt.port))
if m_cfg_type == 'AWG':
out = out.replace('<JC>', str(jc))
out = out.replace('<JMIN>', str(jmin))
out = out.replace('<JMAX>', str(jmax))
out = out.replace('<S1>', str(random.randint(3, 127)))
out = out.replace('<S2>', str(random.randint(3, 127)))
out = out.replace('<H1>', str(random.randint(0x10000011, 0x7FFFFF00)))
out = out.replace('<H2>', str(random.randint(0x10000011, 0x7FFFFF00)))
out = out.replace('<H3>', str(random.randint(0x10000011, 0x7FFFFF00)))
out = out.replace('<H4>', str(random.randint(0x10000011, 0x7FFFFF00)))
else:
out = out.replace('\nJc = <' , '\n# ')
out = out.replace('\nJmin = <', '\n# ')
out = out.replace('\nJmax = <', '\n# ')
out = out.replace('\nS1 = <' , '\n# ')
out = out.replace('\nS2 = <' , '\n# ')
out = out.replace('\nH1 = <' , '\n# ')
out = out.replace('\nH2 = <' , '\n# ')
out = out.replace('\nH3 = <' , '\n# ')
out = out.replace('\nH4 = <' , '\n# ')
out = out.replace('<SERVER_IFACE>', main_iface)
out = out.replace('<SERVER_TUN>', tun_name)
with open(g_main_config_fn, 'w', newline = '\n') as file:
file.write(out)
print(f'{m_cfg_type} server config file "{g_main_config_fn}" created!')
with open(g_main_config_src, 'w', newline = '\n') as file:
file.write(g_main_config_fn)
sys.exit(0)
# -------------------------------------------------------------------------------------
get_main_config_path(check = True)
if opt.create:
if os.path.exists(opt.tmpcfg):
raise RuntimeError(f'ERROR: file "{opt.tmpcfg}" already exists!')
print(f'Create template for client configs: "{opt.tmpcfg}"...')
os.remove(opt.tmpcfg) if os.path.exists(opt.tmpcfg) else None
if opt.ipaddr:
ipaddr = opt.ipaddr
else:
ext_ipaddr = get_ext_ipaddr()
print(f'External IP-Addr: "{ext_ipaddr}"')
ipaddr = ext_ipaddr
ipaddr = IPAddr(ipaddr)
if ipaddr.mask:
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"')
print(f'Server IP-Addr: "{ipaddr}"')
out = g_defclient_config
out = out.replace('<SERVER_ADDR>', str(ipaddr))
if g_main_config_type != 'AWG':
out = out.replace('\nJc = <' , '\n# ')
out = out.replace('\nJmin = <', '\n# ')
out = out.replace('\nJmax = <', '\n# ')
out = out.replace('\nS1 = <' , '\n# ')
out = out.replace('\nS2 = <' , '\n# ')
out = out.replace('\nH1 = <' , '\n# ')
out = out.replace('\nH2 = <' , '\n# ')
out = out.replace('\nH3 = <' , '\n# ')
out = out.replace('\nH4 = <' , '\n# ')
with open(opt.tmpcfg, 'w', newline = '\n') as file:
file.write(out)
print(f'Template client config file "{opt.tmpcfg}" created!')
sys.exit(0)
# -------------------------------------------------------------------------------------
xopt = [ opt.addcl, opt.update, opt.delete ]
copt = [ x for x in xopt if len(x) > 0 ]
if copt and len(copt) >= 2:
raise RuntimeError(f'ERROR: Incorrect arguments! Too many actions!')
if opt.addcl:
cfg = WGConfig(g_main_config_fn)
srv = cfg.iface
c_name = opt.addcl
print(f'Add new client config "{c_name}"...')
max_addr = None
for peer_name, peer in cfg.peer.items():
if peer_name.lower() == c_name.lower():
raise RuntimeError(f'ERROR: peer with name "{c_name}" already exists!')
if opt.ipaddr:
addr = IPAddr(opt.ipaddr)
addr.mask = None
addr = str(addr)
p_addr = IPAddr(peer['AllowedIPs'])
p_addr.mask = None
p_addr = str(p_addr)
if addr == p_addr:
raise RuntimeError(f'ERROR: IP-addr "{opt.ipaddr}" already used!')
addr = IPAddr(peer['AllowedIPs'])
if not max_addr or addr.ip[3] > max_addr.ip[3]:
max_addr = addr
priv_key, pub_key = gen_pair_keys()
with open(g_main_config_fn, 'rb') as file:
srvcfg = file.read()
srvcfg = srvcfg.decode('utf8')
if opt.ipaddr:
ipaddr = opt.ipaddr
else:
if max_addr is None:
max_addr = IPAddr(srv['Address'])
max_addr.ip[3] += 1
max_addr.mask = 32
ipaddr = str(max_addr)
else:
max_addr.ip[3] += 1
ipaddr = str(max_addr)
if max_addr.ip[3] >= 254:
raise RuntimeError(f'ERROR: There are no more free IP-addresses')
srvcfg += f'\n'
srvcfg += f'[Peer]\n'
srvcfg += f'#_Name = {c_name}\n'
srvcfg += f'#_GenKeyTime = {datetime.datetime.now().isoformat()}\n'
srvcfg += f'#_PrivateKey = {priv_key}\n'
srvcfg += f'PublicKey = {pub_key}\n'
srvcfg += f'AllowedIPs = {ipaddr}\n'
with open(g_main_config_fn, 'w', newline = '\n') as file:
file.write(srvcfg)
print(f'New client "{c_name}" added! IP-Addr: "{ipaddr}"')
if opt.update:
cfg = WGConfig(g_main_config_fn)
p_name = opt.update
print(f'Update keys for client "{p_name}"...')
priv_key, pub_key = gen_pair_keys()
cfg.set_param(p_name, '_PrivateKey', priv_key, force = True, offset = 2)
cfg.set_param(p_name, 'PublicKey', pub_key)
gentime = datetime.datetime.now().isoformat()
cfg.set_param(p_name, '_GenKeyTime', gentime, force = True, offset = 2)
ipaddr = cfg.peer[p_name]['AllowedIPs']
cfg.save()
print(f'Keys for client "{p_name}" updated! IP-Addr: "{ipaddr}"')
if opt.delete:
cfg = WGConfig(g_main_config_fn)
p_name = opt.delete
print(f'Delete client "{p_name}"...')
ipaddr = cfg.del_client(p_name)
cfg.save()
print(f'Client "{p_name}" deleted! IP-Addr: "{ipaddr}"')
if opt.confgen:
cfg = WGConfig(g_main_config_fn)
srv = cfg.iface
print('Generate client configs...')
if not os.path.exists(opt.tmpcfg):
raise RuntimeError(f'ERROR: file "{opt.tmpcfg}" not found!')
with open(opt.tmpcfg, 'r') as file:
tmpcfg = file.read()
flst = glob.glob("*.conf")
for fn in flst:
if fn.endswith('awg0.conf'):
continue
if os.path.exists(fn):
os.remove(fn)
flst = glob.glob("*.png")
for fn in flst:
if os.path.exists(fn):
os.remove(fn)
random.seed()
for peer_name, peer in cfg.peer.items():
if 'Name' not in peer or 'PrivateKey' not in peer:
print(f'Skip peer with pubkey "{peer["PublicKey"]}"')
continue
jc = random.randint(3, 127)
jmin = random.randint(3, 700)
jmax = random.randint(jmin+1, 1270)
out = tmpcfg[:]
out = out.replace('<CLIENT_PRIVATE_KEY>', peer['PrivateKey'])
out = out.replace('<CLIENT_PUBLIC_KEY>', peer['PublicKey'])
out = out.replace('<CLIENT_TUNNEL_IP>', peer['AllowedIPs'])
out = out.replace('<JC>', str(jc))
out = out.replace('<JMIN>', str(jmin))
out = out.replace('<JMAX>', str(jmax))
out = out.replace('<S1>', srv['S1'])
out = out.replace('<S2>', srv['S2'])
out = out.replace('<H1>', srv['H1'])
out = out.replace('<H2>', srv['H2'])
out = out.replace('<H3>', srv['H3'])
out = out.replace('<H4>', srv['H4'])
out = out.replace('<SERVER_PORT>', srv['ListenPort'])
out = out.replace('<SERVER_PUBLIC_KEY>', srv['PublicKey'])
fn = f'{peer_name}.conf'
with open(fn, 'w', newline = '\n') as file:
file.write(out)
if opt.qrcode:
print('Generate QR codes...')
flst = glob.glob("*.png")
for fn in flst:
if os.path.exists(fn):
os.remove(fn)
flst = glob.glob("*.conf")
if not flst:
raise RuntimeError(f'ERROR: client configs not founded!')
import qrcode
for fn in flst:
if fn.endswith('awg0.conf'):
continue
with open(fn, 'rb') as file:
conf = file.read()
conf = conf.decode('utf8')
name = os.path.splitext(fn)[0]
img = qrcode.make(conf)
img.save(f'{name}.png')
print('===== OK =====')
@gitfuntik
Copy link

gitfuntik commented Mar 4, 2025

По подсказкам нейросети.
Можно "закольцевать" выдачу IP.
Вместо +1 к уже выданным IP (прописанным в awg0.conf) и ошибке при достижении финального октета, скрипт будет искать ближайший свободный от первого IP и только если вообще все IP заняты, выдаст ошибку.

1. Импортируйте модуль ipaddress
import ipaddress

2. Добавьте функцию поиска свободного IP

def find_free_ip(used_ips, base_network):
    """
    Ищет первый свободный IP-адрес в подсети, заданной base_network,
    исключая уже использованные адреса из множества used_ips.
    """
    net = ipaddress.ip_network(base_network, strict=False)
    for ip in net.hosts():
        if str(ip) not in used_ips:
            return str(ip)
    raise RuntimeError("ERROR: Нет свободных IP-адресов в сети")

3. Измените функцию add_client

def add_client(args):
    cfg = WGConfig(get_main_config_path())
    srv = cfg.iface
    c_name = args.addcl
    print(f'Add new client config "{c_name}"...')

    # Проверяем, что имя клиента уникально
    for peer_name in cfg.peer.keys():
        if peer_name.lower() == c_name.lower():
            raise RuntimeError(f'ERROR: peer with name "{c_name}" already exists!')

    # Собираем уже использованные IP-адреса
    used_ips = set()
    for peer in cfg.peer.values():
        used_ips.add(peer['AllowedIPs'].split('/')[0])  # убираем маску, если есть

    # Если IP для клиента задан вручную, проверяем его
    if args.ipaddr:
        ipaddr = args.ipaddr
        if ipaddr in used_ips:
            raise RuntimeError(f'ERROR: IP-addr "{ipaddr}" already used!')
    else:
        # Берём базовую подсеть из серверного конфига
        base_network = srv['Address']  # например, "10.9.9.1/22"
        ipaddr = find_free_ip(used_ips, base_network)

    priv_key, pub_key = gen_pair_keys()
    with open(get_main_config_path(), 'rb') as file:
        srvcfg = file.read().decode('utf8')

    # Формируем новую запись для клиента
    srvcfg += f'\n[Peer]\n'
    srvcfg += f'#_Name = {c_name}\n'
    srvcfg += f'#_GenKeyTime = {datetime.datetime.now().isoformat()}\n'
    srvcfg += f'#_PrivateKey = {priv_key}\n'
    srvcfg += f'PublicKey = {pub_key}\n'
    srvcfg += f'AllowedIPs = {ipaddr}/32\n'

    with open(get_main_config_path(), 'w', newline='\n') as file:
        file.write(srvcfg)

    print(f'New client "{c_name}" added! IP-Addr: "{ipaddr}"')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment