-
-
Save remittor/8c3d9ff293b2ba4b13c367cc1a69f9eb to your computer and use it in GitHub Desktop.
# SPDX-License-Identifier: MIT | |
# Author: remittor <[email protected]> | |
# Created: 2024 | |
import os | |
import sys | |
import glob | |
import subprocess | |
import optparse | |
import random | |
import datetime | |
g_main_config_src = '.main.config' | |
g_main_config_fn = None | |
g_main_config_type = None | |
g_defclient_config_fn = "_defclient.config" | |
parser = optparse.OptionParser("usage: %prog [options]") | |
parser.add_option("-t", "--tmpcfg", dest="tmpcfg", default = g_defclient_config_fn) | |
parser.add_option("-c", "--conf", dest="confgen", action="store_true", default = False) | |
parser.add_option("-q", "--qrcode", dest="qrcode", action="store_true", default = False) | |
parser.add_option("-a", "--add", dest="addcl", default = "") | |
parser.add_option("-u", "--update", dest="update", default = "") | |
parser.add_option("-d", "--delete", dest="delete", default = "") | |
parser.add_option("-i", "--ipaddr", dest="ipaddr", default = "") | |
parser.add_option("-p", "--port", dest="port", default = None, type = 'int') | |
parser.add_option("", "--make", dest="makecfg", default = "") | |
parser.add_option("", "--tun", dest="tun", default = "") | |
parser.add_option("", "--create", dest="create", action="store_true", default = False) | |
(opt, args) = parser.parse_args() | |
g_defserver_config = """ | |
[Interface] | |
#_GenKeyTime = <SERVER_KEY_TIME> | |
PrivateKey = <SERVER_PRIVATE_KEY> | |
#_PublicKey = <SERVER_PUBLIC_KEY> | |
Address = <SERVER_ADDR> | |
ListenPort = <SERVER_PORT> | |
Jc = <JC> | |
Jmin = <JMIN> | |
Jmax = <JMAX> | |
S1 = <S1> | |
S2 = <S2> | |
H1 = <H1> | |
H2 = <H2> | |
H3 = <H3> | |
H4 = <H4> | |
PostUp = iptables -A INPUT -p udp --dport <SERVER_PORT> -m conntrack --ctstate NEW -j ACCEPT --wait 10 --wait-interval 50; iptables -A FORWARD -i <SERVER_IFACE> -o <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -A FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -t nat -A POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50; ip6tables -A FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; ip6tables -t nat -A POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50 | |
PostDown = iptables -D INPUT -p udp --dport <SERVER_PORT> -m conntrack --ctstate NEW -j ACCEPT --wait 10 --wait-interval 50; iptables -D FORWARD -i <SERVER_IFACE> -o <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -D FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; iptables -t nat -D POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50; ip6tables -D FORWARD -i <SERVER_TUN> -j ACCEPT --wait 10 --wait-interval 50; ip6tables -t nat -D POSTROUTING -o <SERVER_IFACE> -j MASQUERADE --wait 10 --wait-interval 50 | |
""" | |
g_defclient_config = """ | |
[Interface] | |
PrivateKey = <CLIENT_PRIVATE_KEY> | |
#_PublicKey = <CLIENT_PUBLIC_KEY> | |
Address = <CLIENT_TUNNEL_IP> | |
DNS = 8.8.8.8 | |
Jc = <JC> | |
Jmin = <JMIN> | |
Jmax = <JMAX> | |
S1 = <S1> | |
S2 = <S2> | |
H1 = <H1> | |
H2 = <H2> | |
H3 = <H3> | |
H4 = <H4> | |
[Peer] | |
AllowedIPs = 0.0.0.0/5, 8.0.0.0/7, 11.0.0.0/8, 12.0.0.0/6, 16.0.0.0/4, 32.0.0.0/3, 64.0.0.0/2, 128.0.0.0/3, 160.0.0.0/5, 168.0.0.0/6, 172.0.0.0/12, 172.32.0.0/11, 172.64.0.0/10, 172.128.0.0/9, 173.0.0.0/8, 174.0.0.0/7, 176.0.0.0/4, 192.0.0.0/9, 192.128.0.0/11, 192.160.0.0/13, 192.169.0.0/16, 192.170.0.0/15, 192.172.0.0/14, 192.176.0.0/12, 192.192.0.0/10, 193.0.0.0/8, 194.0.0.0/7, 196.0.0.0/6, 200.0.0.0/5, 208.0.0.0/4, 8.8.8.8/32 | |
Endpoint = <SERVER_ADDR>:<SERVER_PORT> | |
PersistentKeepalive = 60 | |
PublicKey = <SERVER_PUBLIC_KEY> | |
""" | |
class IPAddr(): | |
def __init__(self, ipaddr = None): | |
self.ip = [ 0, 0, 0, 0 ] | |
self.mask = None | |
if ipaddr: | |
self.init(ipaddr) | |
def init(self, ipaddr): | |
_ipaddr = ipaddr | |
if not ipaddr: | |
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"') | |
if ' ' in ipaddr: | |
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"') | |
if ',' in ipaddr: | |
raise RuntimeError(f'ERROR: Incorrect IP-Addr: "{_ipaddr}"') | |
self.ip = [ 0, 0, 0, 0 ] | |
self.mask = None | |
if '/' in ipaddr: | |
self.mask = int(ipaddr.split('/')[1]) | |
ipaddr = ipaddr.split('/')[0] | |
nlist = ipaddr.split('.') | |
if len(nlist) != 4: | |
raise RuntimeError(f'ERROR: Incorrect IP-addr: "{_ipaddr}"') | |
for n, num in enumerate(nlist): | |
self.ip[n] = int(num) | |
def __str__(self): | |
out = f'{self.ip[0]}.{self.ip[1]}.{self.ip[2]}.{self.ip[3]}' | |
if self.mask: | |
out += '/' + str(self.mask) | |
return out | |
class WGConfig(): | |
def __init__(self, filename = None): | |
self.lines = [ ] | |
self.iface = { } | |
self.peer = { } | |
self.idsline = { } | |
self.cfg_fn = None | |
if filename: | |
self.load(filename) | |
def load(self, filename): | |
self.cfg_fn = None | |
self.lines = [ ] | |
self.iface = { } | |
self.peer = { } | |
self.idsline = { } | |
with open(filename, 'r') as file: | |
lines = file.readlines() | |
iface = None | |
secdata = [ ] | |
secdata_item = None | |
secline = [ ] | |
secline_item = None | |
for n, line in enumerate(lines): | |
line = line.rstrip() | |
self.lines.append(line) | |
if line.strip() == '': | |
continue | |
if line.startswith(' ') and not line.strip().startswith('#'): | |
raise RuntimeError(f'ERROR_CFG: Incorrect line #{n} into config "{filename}"') | |
if line.startswith('#') and not line.startswith('#_'): | |
continue | |
if line.startswith('[') and line.endswith(']'): | |
section_name = line[1:-1] | |
if not section_name: | |
raise RuntimeError(f'ERROR_CFG: Incorrect section name: "{section_name}" (#{n+1})') | |
#print(secname) | |
secdata_item = { "_section_name": section_name.lower() } | |
secline_item = { "_section_name": n } | |
if section_name.lower() == 'interface': | |
if iface: | |
raise RuntimeError(f'ERROR_CFG: Found second section Interface in line #{n+1}') | |
iface = secdata_item | |
elif section_name.lower() == 'peer': | |
pass | |
else: | |
raise RuntimeError(f'ERROR_CFG: Found incorrect section "{section_name}" in line #{n+1}') | |
secdata.append(secdata_item) | |
secline.append(secline_item) | |
continue | |
if line.startswith('#_') and ' = ' in line: | |
line = line[2:] | |
if line.startswith('#'): | |
continue | |
if ' = ' not in line: | |
raise RuntimeError(f'ERROR_CFG: Incorrect line into config: "{line}" (#{n+1})') | |
xv = line.find(' = ') | |
if xv <= 0: | |
raise RuntimeError(f'ERROR_CFG: Incorrect line into config: "{line}" (#{n+1})') | |
vname = line[:xv].strip() | |
value = line[xv+3:].strip() | |
#print(f' "{vname}" = "{value}"') | |
if not secdata_item: | |
raise RuntimeError(f'ERROR_CFG: Parameter "{vname}" have unknown section! (#{n+1})') | |
section_name = secdata_item['_section_name'] | |
if vname in secdata_item: | |
raise RuntimeError(f'ERROR_CFG: Found duplicate of param "{vname}" into section "{section_name}" (#{n+1})') | |
secdata_item[vname] = value | |
secline_item[vname] = n | |
if not iface: | |
raise RuntimeError(f'ERROR_CFG: Cannot found section Interface!') | |
for i, item in enumerate(secdata): | |
line = secline[i] | |
peer_name = "" | |
if item['_section_name'] == 'interface': | |
self.iface = item | |
peer_name = "__this_server__" | |
if 'PublicKey' not in item: | |
raise RuntimeError(f'ERROR_CFG: Cannot found PublicKey in Interface') | |
if 'PrivateKey' not in item: | |
raise RuntimeError(f'ERROR_CFG: Cannot found PrivateKey in Interface') | |
else: | |
if 'Name' in item: | |
peer_name = item['Name'] | |
if not peer_name: | |
raise RuntimeError(f'ERROR_CFG: Invalid peer Name in line #{line["Name"]}') | |
elif 'PublicKey' in item: | |
peer_name = item['PublicKey'] | |
if not peer_name: | |
raise RuntimeError(f'ERROR_CFG: Invalid peer PublicKey in line #{line["PublicKey"]}') | |
else: | |
raise RuntimeError(f'ERROR_CFG: Invalid peer data in line #{line["_section_name"]}') | |
if 'AllowedIPs' not in item: | |
raise RuntimeError(f'ERROR_CFG: Cannot found "AllowedIPs" into peer "{peer_name}"') | |
if peer_name in self.peer: | |
raise RuntimeError(f'ERROR_CFG: Found duplicate peer with name "{peer_name}"') | |
self.peer[peer_name] = item | |
if peer_name in self.idsline: | |
raise RuntimeError(f'ERROR_CFG: Found duplicate peer with name "{peer_name}"') | |
min_line = line['_section_name'] | |
max_line = min_line | |
self.idsline[f'{peer_name}'] = min_line | |
for vname in item: | |
self.idsline[f'{peer_name}|{vname}'] = line[vname] | |
if line[vname] > max_line: | |
max_line = line[vname] | |
item['_lines_range'] = ( min_line, max_line ) | |
self.cfg_fn = filename | |
return len(self.peer) | |
def save(self, filename = None): | |
if not filename: | |
filename = self.cfg_fn | |
if not self.lines: | |
raise RuntimeError(f'ERROR: no data') | |
with open(filename, 'w', newline = '\n') as file: | |
for line in self.lines: | |
file.write(line + '\n') | |
def del_client(self, c_name): | |
if c_name not in self.peer: | |
raise RuntimeError(f'ERROR: Not found client "{c_name}" in peer list!') | |
client = self.peer[c_name] | |
ipaddr = client['AllowedIPs'] | |
min_line, max_line = client['_lines_range'] | |
del self.lines[min_line:max_line+1] | |
del self.peer[c_name] | |
secsize = max_line - min_line + 1 | |
del_list = [ ] | |
for k, v in self.idsline.items(): | |
if v >= min_line and v <= max_line: | |
del_list.append(k) | |
elif v > max_line: | |
self.idsline[k] = v - secsize | |
for k in del_list: | |
del self.idsline[k] | |
return ipaddr | |
def set_param(self, c_name, param_name, param_value, force = False, offset = 0): | |
if c_name not in self.peer: | |
raise RuntimeError(f'ERROR: Not found client "{c_name}" in peer list!') | |
line_prefix = "" | |
if param_name.startswith('_'): | |
line_prefix = "#_" | |
param_name = param_name[1:] | |
client = self.peer[c_name] | |
min_line, max_line = client['_lines_range'] | |
if param_name in client: | |
nline = self.idsline[f'{c_name}|{param_name}'] | |
line = self.lines[nline] | |
if line.startswith('#_'): | |
line_prefix = "#_" | |
self.lines[nline] = f'{line_prefix}{param_name} = {param_value}' | |
return | |
if not force: | |
raise RuntimeError(f'ERROR: Param "{param_name}" not found for client "{c_name}"') | |
new_line = f'{line_prefix}{param_name} = {param_value}' | |
client[param_name] = param_value | |
secsize = max_line - min_line + 1 | |
if offset >= secsize: | |
raise RuntimeError(f'ERROR: Incorrect offset value = {offset} (secsize = {secsize})') | |
pos = max_line + 1 if offset <= 0 else min_line + offset | |
for k, v in self.idsline.items(): | |
if v >= pos: | |
self.idsline[k] = v + 1 | |
self.idsline[f'{c_name}|{param_name}'] = pos | |
self.lines.insert(pos, new_line) | |
return | |
def exec_cmd(cmd, input = None, shell = True, check = True, timeout = None): | |
proc = subprocess.run(cmd, input = input, shell = shell, check = check, | |
timeout = timeout, encoding = 'utf8', | |
stdout = subprocess.PIPE, stderr = subprocess.STDOUT) | |
rc = proc.returncode | |
out = proc.stdout | |
return rc, out | |
def get_main_iface(): | |
rc, out = exec_cmd('ip link show') | |
if rc: | |
raise RuntimeError(f'ERROR: Cannot get net interfaces') | |
for line in out.split('\n'): | |
if '<BROADCAST' in line and 'state UP' in line: | |
xv = line.split(':') | |
return xv[1].strip() | |
return None | |
def get_ext_ipaddr(): | |
rc, out = exec_cmd('curl -4 -s icanhazip.com') | |
if rc: | |
raise RuntimeError(f'ERROR: Cannot get ext IP-Addr') | |
lines = out.split('\n') | |
ipaddr = lines[-1] if lines[-1] else lines[-2] | |
ipaddr = IPAddr(ipaddr) | |
return str(ipaddr) | |
def gen_pair_keys(cfg_type = None): | |
global g_main_config_type | |
if sys.platform == 'win32': | |
return 'client_priv_key', 'client_pub_key' | |
if not cfg_type: | |
cfg_type = g_main_config_type | |
if not cfg_type: | |
raise | |
wgtool = cfg_type.lower() | |
rc, out = exec_cmd(f'{wgtool} genkey') | |
if rc: | |
raise RuntimeError(f'ERROR: Cannot generate private key') | |
priv_key = out.strip() | |
if not priv_key: | |
raise RuntimeError(f'ERROR: Cannot generate private Key') | |
rc, out = exec_cmd(f'{wgtool} pubkey', input = priv_key + '\n') | |
if rc: | |
raise RuntimeError(f'ERROR: Cannot generate public key') | |
pub_key = out.strip() | |
if not pub_key: | |
raise RuntimeError(f'ERROR: Cannot generate public Key') | |
return priv_key, pub_key | |
def get_main_config_path(check = True): | |
global g_main_config_fn | |
global g_main_config_type | |
if not os.path.exists(g_main_config_src): | |
raise RuntimeError(f'ERROR: file "{g_main_config_src}" not found!') | |
with open(g_main_config_src, 'r') as file: | |
lines = file.readlines() | |
g_main_config_fn = lines[0].strip() | |
cfg_exists = os.path.exists(g_main_config_fn) | |
g_main_config_type = 'WG' | |
if os.path.basename(g_main_config_fn).startswith('a'): | |
g_main_config_type = 'AWG' | |
if check: | |
if not cfg_exists: | |
raise RuntimeError(f'ERROR: Main {g_main_config_type} config file "{g_main_config_fn}" not found!') | |
return g_main_config_fn | |
# ------------------------------------------------------------------------------------- | |
if opt.makecfg: | |
g_main_config_fn = opt.makecfg | |
if os.path.exists(g_main_config_fn): | |
raise RuntimeError(f'ERROR: file "{g_main_config_fn}" already exists!') | |
m_cfg_type = 'WG' | |
if os.path.basename(g_main_config_fn).startswith('a'): | |
m_cfg_type = 'AWG' | |
print(f'Make {m_cfg_type} server config: "{g_main_config_fn}"...') | |
main_iface = get_main_iface() | |
if not main_iface: | |
raise RuntimeError(f'ERROR: Cannot get main network interface!') | |
print(f'Main network iface: "{main_iface}"') | |
if opt.port <= 1000 or opt.port > 65530: | |
raise RuntimeError(f'ERROR: Incorrect argument port = {opt.port}') | |
if not opt.ipaddr: | |
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"') | |
ipaddr = IPAddr(opt.ipaddr) | |
if not ipaddr.mask: | |
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"') | |
if opt.tun: | |
tun_name = opt.tun | |
else: | |
cfg_name = os.path.basename(g_main_config_fn) | |
tun_name = os.path.splitext(cfg_name)[0].strip() | |
print(f'Tunnel iface: "{tun_name}"') | |
priv_key, pub_key = gen_pair_keys(m_cfg_type) | |
random.seed() | |
jc = random.randint(3, 127) | |
jmin = random.randint(3, 700) | |
jmax = random.randint(jmin+1, 1270) | |
out = g_defserver_config | |
out = out.replace('<SERVER_KEY_TIME>', datetime.datetime.now().isoformat()) | |
out = out.replace('<SERVER_PRIVATE_KEY>', priv_key) | |
out = out.replace('<SERVER_PUBLIC_KEY>', pub_key) | |
out = out.replace('<SERVER_ADDR>', str(ipaddr)) | |
out = out.replace('<SERVER_PORT>', str(opt.port)) | |
if m_cfg_type == 'AWG': | |
out = out.replace('<JC>', str(jc)) | |
out = out.replace('<JMIN>', str(jmin)) | |
out = out.replace('<JMAX>', str(jmax)) | |
out = out.replace('<S1>', str(random.randint(3, 127))) | |
out = out.replace('<S2>', str(random.randint(3, 127))) | |
out = out.replace('<H1>', str(random.randint(0x10000011, 0x7FFFFF00))) | |
out = out.replace('<H2>', str(random.randint(0x10000011, 0x7FFFFF00))) | |
out = out.replace('<H3>', str(random.randint(0x10000011, 0x7FFFFF00))) | |
out = out.replace('<H4>', str(random.randint(0x10000011, 0x7FFFFF00))) | |
else: | |
out = out.replace('\nJc = <' , '\n# ') | |
out = out.replace('\nJmin = <', '\n# ') | |
out = out.replace('\nJmax = <', '\n# ') | |
out = out.replace('\nS1 = <' , '\n# ') | |
out = out.replace('\nS2 = <' , '\n# ') | |
out = out.replace('\nH1 = <' , '\n# ') | |
out = out.replace('\nH2 = <' , '\n# ') | |
out = out.replace('\nH3 = <' , '\n# ') | |
out = out.replace('\nH4 = <' , '\n# ') | |
out = out.replace('<SERVER_IFACE>', main_iface) | |
out = out.replace('<SERVER_TUN>', tun_name) | |
with open(g_main_config_fn, 'w', newline = '\n') as file: | |
file.write(out) | |
print(f'{m_cfg_type} server config file "{g_main_config_fn}" created!') | |
with open(g_main_config_src, 'w', newline = '\n') as file: | |
file.write(g_main_config_fn) | |
sys.exit(0) | |
# ------------------------------------------------------------------------------------- | |
get_main_config_path(check = True) | |
if opt.create: | |
if os.path.exists(opt.tmpcfg): | |
raise RuntimeError(f'ERROR: file "{opt.tmpcfg}" already exists!') | |
print(f'Create template for client configs: "{opt.tmpcfg}"...') | |
os.remove(opt.tmpcfg) if os.path.exists(opt.tmpcfg) else None | |
if opt.ipaddr: | |
ipaddr = opt.ipaddr | |
else: | |
ext_ipaddr = get_ext_ipaddr() | |
print(f'External IP-Addr: "{ext_ipaddr}"') | |
ipaddr = ext_ipaddr | |
ipaddr = IPAddr(ipaddr) | |
if ipaddr.mask: | |
raise RuntimeError(f'ERROR: Incorrect argument ipaddr = "{opt.ipaddr}"') | |
print(f'Server IP-Addr: "{ipaddr}"') | |
out = g_defclient_config | |
out = out.replace('<SERVER_ADDR>', str(ipaddr)) | |
if g_main_config_type != 'AWG': | |
out = out.replace('\nJc = <' , '\n# ') | |
out = out.replace('\nJmin = <', '\n# ') | |
out = out.replace('\nJmax = <', '\n# ') | |
out = out.replace('\nS1 = <' , '\n# ') | |
out = out.replace('\nS2 = <' , '\n# ') | |
out = out.replace('\nH1 = <' , '\n# ') | |
out = out.replace('\nH2 = <' , '\n# ') | |
out = out.replace('\nH3 = <' , '\n# ') | |
out = out.replace('\nH4 = <' , '\n# ') | |
with open(opt.tmpcfg, 'w', newline = '\n') as file: | |
file.write(out) | |
print(f'Template client config file "{opt.tmpcfg}" created!') | |
sys.exit(0) | |
# ------------------------------------------------------------------------------------- | |
xopt = [ opt.addcl, opt.update, opt.delete ] | |
copt = [ x for x in xopt if len(x) > 0 ] | |
if copt and len(copt) >= 2: | |
raise RuntimeError(f'ERROR: Incorrect arguments! Too many actions!') | |
if opt.addcl: | |
cfg = WGConfig(g_main_config_fn) | |
srv = cfg.iface | |
c_name = opt.addcl | |
print(f'Add new client config "{c_name}"...') | |
max_addr = None | |
for peer_name, peer in cfg.peer.items(): | |
if peer_name.lower() == c_name.lower(): | |
raise RuntimeError(f'ERROR: peer with name "{c_name}" already exists!') | |
if opt.ipaddr: | |
addr = IPAddr(opt.ipaddr) | |
addr.mask = None | |
addr = str(addr) | |
p_addr = IPAddr(peer['AllowedIPs']) | |
p_addr.mask = None | |
p_addr = str(p_addr) | |
if addr == p_addr: | |
raise RuntimeError(f'ERROR: IP-addr "{opt.ipaddr}" already used!') | |
addr = IPAddr(peer['AllowedIPs']) | |
if not max_addr or addr.ip[3] > max_addr.ip[3]: | |
max_addr = addr | |
priv_key, pub_key = gen_pair_keys() | |
with open(g_main_config_fn, 'rb') as file: | |
srvcfg = file.read() | |
srvcfg = srvcfg.decode('utf8') | |
if opt.ipaddr: | |
ipaddr = opt.ipaddr | |
else: | |
if max_addr is None: | |
max_addr = IPAddr(srv['Address']) | |
max_addr.ip[3] += 1 | |
max_addr.mask = 32 | |
ipaddr = str(max_addr) | |
else: | |
max_addr.ip[3] += 1 | |
ipaddr = str(max_addr) | |
if max_addr.ip[3] >= 254: | |
raise RuntimeError(f'ERROR: There are no more free IP-addresses') | |
srvcfg += f'\n' | |
srvcfg += f'[Peer]\n' | |
srvcfg += f'#_Name = {c_name}\n' | |
srvcfg += f'#_GenKeyTime = {datetime.datetime.now().isoformat()}\n' | |
srvcfg += f'#_PrivateKey = {priv_key}\n' | |
srvcfg += f'PublicKey = {pub_key}\n' | |
srvcfg += f'AllowedIPs = {ipaddr}\n' | |
with open(g_main_config_fn, 'w', newline = '\n') as file: | |
file.write(srvcfg) | |
print(f'New client "{c_name}" added! IP-Addr: "{ipaddr}"') | |
if opt.update: | |
cfg = WGConfig(g_main_config_fn) | |
p_name = opt.update | |
print(f'Update keys for client "{p_name}"...') | |
priv_key, pub_key = gen_pair_keys() | |
cfg.set_param(p_name, '_PrivateKey', priv_key, force = True, offset = 2) | |
cfg.set_param(p_name, 'PublicKey', pub_key) | |
gentime = datetime.datetime.now().isoformat() | |
cfg.set_param(p_name, '_GenKeyTime', gentime, force = True, offset = 2) | |
ipaddr = cfg.peer[p_name]['AllowedIPs'] | |
cfg.save() | |
print(f'Keys for client "{p_name}" updated! IP-Addr: "{ipaddr}"') | |
if opt.delete: | |
cfg = WGConfig(g_main_config_fn) | |
p_name = opt.delete | |
print(f'Delete client "{p_name}"...') | |
ipaddr = cfg.del_client(p_name) | |
cfg.save() | |
print(f'Client "{p_name}" deleted! IP-Addr: "{ipaddr}"') | |
if opt.confgen: | |
cfg = WGConfig(g_main_config_fn) | |
srv = cfg.iface | |
print('Generate client configs...') | |
if not os.path.exists(opt.tmpcfg): | |
raise RuntimeError(f'ERROR: file "{opt.tmpcfg}" not found!') | |
with open(opt.tmpcfg, 'r') as file: | |
tmpcfg = file.read() | |
flst = glob.glob("*.conf") | |
for fn in flst: | |
if fn.endswith('awg0.conf'): | |
continue | |
if os.path.exists(fn): | |
os.remove(fn) | |
flst = glob.glob("*.png") | |
for fn in flst: | |
if os.path.exists(fn): | |
os.remove(fn) | |
random.seed() | |
for peer_name, peer in cfg.peer.items(): | |
if 'Name' not in peer or 'PrivateKey' not in peer: | |
print(f'Skip peer with pubkey "{peer["PublicKey"]}"') | |
continue | |
jc = random.randint(3, 127) | |
jmin = random.randint(3, 700) | |
jmax = random.randint(jmin+1, 1270) | |
out = tmpcfg[:] | |
out = out.replace('<CLIENT_PRIVATE_KEY>', peer['PrivateKey']) | |
out = out.replace('<CLIENT_PUBLIC_KEY>', peer['PublicKey']) | |
out = out.replace('<CLIENT_TUNNEL_IP>', peer['AllowedIPs']) | |
out = out.replace('<JC>', str(jc)) | |
out = out.replace('<JMIN>', str(jmin)) | |
out = out.replace('<JMAX>', str(jmax)) | |
out = out.replace('<S1>', srv['S1']) | |
out = out.replace('<S2>', srv['S2']) | |
out = out.replace('<H1>', srv['H1']) | |
out = out.replace('<H2>', srv['H2']) | |
out = out.replace('<H3>', srv['H3']) | |
out = out.replace('<H4>', srv['H4']) | |
out = out.replace('<SERVER_PORT>', srv['ListenPort']) | |
out = out.replace('<SERVER_PUBLIC_KEY>', srv['PublicKey']) | |
fn = f'{peer_name}.conf' | |
with open(fn, 'w', newline = '\n') as file: | |
file.write(out) | |
if opt.qrcode: | |
print('Generate QR codes...') | |
flst = glob.glob("*.png") | |
for fn in flst: | |
if os.path.exists(fn): | |
os.remove(fn) | |
flst = glob.glob("*.conf") | |
if not flst: | |
raise RuntimeError(f'ERROR: client configs not founded!') | |
import qrcode | |
for fn in flst: | |
if fn.endswith('awg0.conf'): | |
continue | |
with open(fn, 'rb') as file: | |
conf = file.read() | |
conf = conf.decode('utf8') | |
name = os.path.splitext(fn)[0] | |
img = qrcode.make(conf) | |
img.save(f'{name}.png') | |
print('===== OK =====') | |
Здравствуйте, хочу использовать ваш скрипт в своем проекте. Не моли бы вы лицензировать его под пермиссивной или копилефтной лицензией (MIT, BSD, GPL, да хоть AGPL)?
Скажите, а PresharedKey не предусмотрен?
Скажите, а PresharedKey не предусмотрен?
Не предусмотрел, т.к. не разобрался для чего он нужен.
Строка 334:
rc, out = exec_cmd('curl -4 icanhazip.com')
Желательно заменить на:
rc, out = exec_cmd('curl -4 -s icanhazip.com')
Иначе функция ломается при запуске скрипта в контейнере.
Привет!
Почему то конфиги не хотят работать на мобильном интернете, но корректно работают под wifi. Пробовал устанавливать на разные vps - результат одинаков
Подскажите, с чем это может быть связяно.
При этом если создаю конфиг для AWG на тех же vps через приложение AmneziaVPN - всё ок
подскажите, как создать шаблон конфигов, если eth0 не имеет IPv4 адреса, а VPS находится за NAT с публичным IPv4 и предоткрытими портами?
@Vitaly-ZS
Если я правильно проблему, то вам поможет запуск такой команды:
curl -s ipinfo.io/ip
или такой:
curl -s ifconfig.me/ip
Привет, @karavan
спасибо, что откликнулся.
возможно я не достаточно подробно описал проблему
я иду по флоу и запускаю команду:
# python3 awgcfg.py --create
Create template for client configs: "_defclient.config"...
Traceback (most recent call last):
File "/root/awg/awgcfg.py", line 490, in <module>
ext_ipaddr = get_ext_ipaddr()
File "/root/awg/awgcfg.py", line 334, in get_ext_ipaddr
rc, out = exec_cmd('curl -4 -s icanhazip.com')
File "/root/awg/awgcfg.py", line 314, in exec_cmd
proc = subprocess.run(cmd, input = input, shell = shell, check = check,
File "/usr/lib/python3.10/subprocess.py", line 526, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command 'curl -4 -s icanhazip.com' returned non-zero exit status 7.
полагаю, что проблема в том, что в моей конфигурации VPS нет выделенного IPv4, как видно из:
# ip a
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1400 qdisc fq_codel state UP group default qlen 1000
link/ether yyyyyyy brd ff:ff:ff:ff:ff:ff
altname enp0s3
inet6 xxxxxx/64 scope global
valid_lft forever preferred_lft forever
inet6 zzzzzzzz/64 scope link
valid_lft forever preferred_lft forever
но поставщик VPS предоставил мне публичный NAT IPv4 и десяток портов, которые переадресовывают на мой VPS.
Вопрос, как мне создать _defclient.config, если скрипт не может получить адреса? Как вручную указать нужный IP?
@Vitaly-ZS
26-я строчка скрипта указывает параметр, через который можно передать IP сервера
@pashakrapivin Здравствуйте! У меня была такая же проблема на МТС. Попробуйте уменьшить значения Jc/Jmin/Jmax и на сервере, и на клиенте. Например, взять Jc=4, Jmin=10, Jmax=50. Скорее всего, что-то близкое задано и в AmneziaVPN, там можно посмотреть значения параметров. Вот ещё здесь ту же проблему обсуждают: amnezia-vpn/amnezia-client#1010
@pashakrapivin Здравствуйте! У меня была такая же проблема на МТС. Попробуйте уменьшить значения Jc/Jmin/Jmax и на сервере, и на клиенте. Например, взять Jc=4, Jmin=10, Jmax=50. Скорее всего, что-то близкое задано и в AmneziaVPN, там можно посмотреть значения параметров. Вот ещё здесь ту же проблему обсуждают: amnezia-vpn/amnezia-client#1010
Спасибо большое, это сработало. Подправил на
random.seed()
jc = random.randint(3, 10)
jmin = random.randint(3, 20)
jmax = random.randint(jmin+1, 50)
AmneziaVPN ставит значения Jc = 3, Jmin = 10, Jmax = 50
Вы в скрипте во-первых забыли IPv6 в AllowedIPs поставить, будет его утечка
Во-вторых достаточно 0.0.0.0/0 и ::/0 поставить и всё равно будет правильно работать
А зачем 8.8.8.8/32 в allowed-ips? Вы собираетесь клиент разворачивать на гугловских серверах?
Все работает, только к автору несколько вопросов?
- Почему в клиентском AllowedIPs (строка 73) именно такие адреса? По какому принципу они отбирались. По дефолту же 0.0.0.0/0, ::0 должно быть, чтоб весь трафик залетал в туннель.
- Почему DNS 8.8.8.8 именно гугловский? Я понимаю что все эти значения можно изменять кому как нравится, но просто интересно почему автор выбрал именно этот DNS, может там своя философия?
- Jc Jmin Jmax S1 S2 H1 H2 H3 H4 Эти значения должны быть на сервере и на клиенте одинаковые? А если я раздам всем своим друзьям конфиги клиентские и у кого-то не будет работать как у парней выше провайдер МТС. Нужно будет изменять значения на сервере и снова раскидывать новые клиентские конфиги друзьям?
Отвечу вместо автора:
- AllowedIPs выбраны так, чтобы исключить немаршрутизируемые диапазоны 10., 172..., 192.168., чтобы продолжала работать локалка. Исключить их не так просто, поэтому такой выбор, как и в оригинальном WG. По-моему, это глючит.
- Гугловский DNS оч хороший и распределён географически
- Одинаковые. Если изменить значения, нужно будет их менять и у клиентов.
Подправил на
random.seed() jc = random.randint(3, 10) jmin = random.randint(3, 20) jmax = random.randint(jmin+1, 50)
Это нужно сделать в двух местах, код повторяется.
Можно заменить код, чтобы "клиенты" занимали все свободные ip адреса в конфиге, а не только по порядку.
Этот кусок
if opt.ipaddr:
ipaddr = opt.ipaddr
else:
if max_addr is None:
max_addr = IPAddr(srv['Address'])
max_addr.ip[3] += 1
max_addr.mask = 32
ipaddr = str(max_addr)
else:
max_addr.ip[3] += 1
ipaddr = str(max_addr)
if max_addr.ip[3] >= 254:
raise RuntimeError(f'ERROR: There are no more free IP-addresses')
На вот этот
if opt.ipaddr:
ipaddr = opt.ipaddr
else:
server_ip = IPAddr(srv['Address'])
start_octet = server_ip.ip[3] + 1
if start_octet >= 254:
raise RuntimeError("ERROR: Нет свободных IP-адресов для клиентов (начальный адрес слишком высокий)")
used_octets = set()
for peer_name, peer in cfg.peer.items():
peer_ip = IPAddr(peer['AllowedIPs'])
used_octets.add(peer_ip.ip[3])
free_octet = None
for octet in range(start_octet, 254):
if octet not in used_octets:
free_octet = octet
break
if free_octet is None:
raise RuntimeError("ERROR: Нет свободных IP-адресов для клиентов")
new_ip = f"{server_ip.ip[0]}.{server_ip.ip[1]}.{server_ip.ip[2]}.{free_octet}"
ipaddr = new_ip + '/32'
По подсказкам нейросети.
Можно "закольцевать" выдачу IP.
Вместо +1 к уже выданным IP (прописанным в awg0.conf) и ошибке при достижении финального октета, скрипт будет искать ближайший свободный от первого IP и только если вообще все IP заняты, выдаст ошибку.
1. Импортируйте модуль ipaddress
import ipaddress
2. Добавьте функцию поиска свободного IP
def find_free_ip(used_ips, base_network):
"""
Ищет первый свободный IP-адрес в подсети, заданной base_network,
исключая уже использованные адреса из множества used_ips.
"""
net = ipaddress.ip_network(base_network, strict=False)
for ip in net.hosts():
if str(ip) not in used_ips:
return str(ip)
raise RuntimeError("ERROR: Нет свободных IP-адресов в сети")
3. Измените функцию add_client
def add_client(args):
cfg = WGConfig(get_main_config_path())
srv = cfg.iface
c_name = args.addcl
print(f'Add new client config "{c_name}"...')
# Проверяем, что имя клиента уникально
for peer_name in cfg.peer.keys():
if peer_name.lower() == c_name.lower():
raise RuntimeError(f'ERROR: peer with name "{c_name}" already exists!')
# Собираем уже использованные IP-адреса
used_ips = set()
for peer in cfg.peer.values():
used_ips.add(peer['AllowedIPs'].split('/')[0]) # убираем маску, если есть
# Если IP для клиента задан вручную, проверяем его
if args.ipaddr:
ipaddr = args.ipaddr
if ipaddr in used_ips:
raise RuntimeError(f'ERROR: IP-addr "{ipaddr}" already used!')
else:
# Берём базовую подсеть из серверного конфига
base_network = srv['Address'] # например, "10.9.9.1/22"
ipaddr = find_free_ip(used_ips, base_network)
priv_key, pub_key = gen_pair_keys()
with open(get_main_config_path(), 'rb') as file:
srvcfg = file.read().decode('utf8')
# Формируем новую запись для клиента
srvcfg += f'\n[Peer]\n'
srvcfg += f'#_Name = {c_name}\n'
srvcfg += f'#_GenKeyTime = {datetime.datetime.now().isoformat()}\n'
srvcfg += f'#_PrivateKey = {priv_key}\n'
srvcfg += f'PublicKey = {pub_key}\n'
srvcfg += f'AllowedIPs = {ipaddr}/32\n'
with open(get_main_config_path(), 'w', newline='\n') as file:
file.write(srvcfg)
print(f'New client "{c_name}" added! IP-Addr: "{ipaddr}"')
Пример использования:
Перед поднятием интерфейса awg0 создаём основной конфиг (сервер будет прослушивать UDP порт 49666):
Примечание: Если вы используете не AWG, а обычный WG, то путь к конфигу должен быть таким:
/etc/wireguard/wg0.conf
Создаём шаблон для клиентских конфигов:
Добавляем в основной конфиг два клиента с именами "my_phone" и "my_router":
Удаляем из основного конфига клиента с именем "my_phone":
Перегенерируем пару ключей у клиента с именем "my_router":
Создаём в текущей директории полный список конфигов для всех клиентов, а так же QR-коды для каждого (png-файлы):
Примечание: после выполнения команд, которые изменяют конфиг WG, нужно вручную перезапускать awg0 (или wg0) интерфейс (либо перезапускать сервис, который его поднимает).
Примечание: последнюю версию скрипта можно скачать такой командой: