Skip to content

Instantly share code, notes, and snippets.

@rooty
Last active June 10, 2026 10:28
Show Gist options
  • Select an option

  • Save rooty/53d6f1c51ba1711c2198d40e5afbfca2 to your computer and use it in GitHub Desktop.

Select an option

Save rooty/53d6f1c51ba1711c2198d40e5afbfca2 to your computer and use it in GitHub Desktop.
Zabbix export-import templates. version 6.4+

Что делает скрипт templates-export.py:

Авторизуется и получает токен Запрашивает все шаблоны (template.get) — id и name Итерируется по каждому и экспортирует отдельным файлом Имя файла формируется как ИМЯ_ШАБЛОНА__idXXX.xml Делает логаут по завершении

Перед запуском замени в начале скрипта:

OLD_ZABBIX — адрес старого сервера ZABBIX_USER / ZABBIX_PASS — логин/пароль FORMAT — xml или yaml (для Zabbix 6+)

Что делает скрипт templates-import.py:

Читает все .xml файлы из папки ./zabbix_templates Импортирует каждый через configuration.import с правилами — создать если нет, обновить если есть Пропускает пустые файлы В конце показывает список файлов которые не удалось импортировать с причиной ошибки

Перед запуском замени NEW_ZABBIX, ZABBIX_USER, ZABBIX_PASS на реальные данные нового сервера.

Что делает скриптhostgroups-migrate.py:

Подключается к обоим серверам одновременно — не нужны промежуточные файлы Получает все группы с источника Проверяет какие уже есть на новом сервере — не дублирует Создаёт только недостающие группы Показывает итоговую статистику с ошибками

Что переносит скрипт для каждого хоста hosts-migrate.py:

Интерфейсы (Agent, SNMP, IPMI, JMX) с деталями Host groups — создаёт недостающие автоматически Привязанные шаблоны — ищет по имени на новом сервере Макросы и теги Инвентарь IPMI настройки TLS настройки Статус (monitored/unmonitored)

Что делает скрипт fix-templates.py:

Берёт все хосты со старого сервера через selectParentTemplates (правильный ключ) Проверяет какие шаблоны уже привязаны на новом — не дублирует Привязывает только недостающие шаблоны через host.update Предупреждает если шаблон не найден на новом сервере

Что делает скрипт proxy-migrate.py:

Переносит все прокси со старого сервера на новый Если прокси уже существует — обновляет его настройки Обрабатывает TLS PSK так же как в скрипте хостов — если ключ недоступен, сбрасывает на no encryption Автоматически привязывает хосты к соответствующим прокси на новом сервере Поддерживает оба формата API — Zabbix 6.4+ (proxyid, operating_mode) и старые версии (host, status, proxy_hostid)

Важно: после миграции прокси нужно на каждом прокси-сервере обновить адрес Zabbix сервера в конфиге /etc/zabbix/zabbix_proxy.conf — поменять Server= на адрес нового сервера и перезапустить сервис.

#!/usr/bin/env python3
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# === НАСТРОЙКИ ===
ZABBIX_SRC_URL = "https://OLD_ZABBIX/api_jsonrpc.php"
ZABBIX_DST_URL = "https://NEW_ZABBIX/api_jsonrpc.php"
ZABBIX_SRC_USER = "Admin"
ZABBIX_SRC_PASS = "zabbix"
ZABBIX_DST_USER = "Admin"
ZABBIX_DST_PASS = "zabbix"
VERIFY_SSL = False
# ── helpers ───────────────────────────────────────────────────────────────────
def make_session():
s = requests.Session()
s.verify = VERIFY_SSL
return s
def api_call(session, url, method, params, auth=None):
headers = {"Content-Type": "application/json"}
if auth:
headers["Authorization"] = f"Bearer {auth}"
resp = session.post(url, json={
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": 1,
}, headers=headers)
resp.raise_for_status()
data = resp.json()
if "error" in data:
raise RuntimeError(f"API error [{method}]: {data['error']}")
return data["result"]
def login(session, url, user, password):
try:
return api_call(session, url, "user.login",
{"username": user, "password": password})
except RuntimeError:
return api_call(session, url, "user.login",
{"user": user, "password": password})
def logout(session, url, token):
try:
api_call(session, url, "user.logout", [], auth=token)
except Exception:
pass
# ── main ──────────────────────────────────────────────────────────────────────
src = make_session()
dst = make_session()
# 1. Авторизация
print("Авторизация...")
src_token = login(src, ZABBIX_SRC_URL, ZABBIX_SRC_USER, ZABBIX_SRC_PASS)
dst_token = login(dst, ZABBIX_DST_URL, ZABBIX_DST_USER, ZABBIX_DST_PASS)
print(f" ✓ Источник: {ZABBIX_SRC_URL}")
print(f" ✓ Назначение: {ZABBIX_DST_URL}\n")
# 2. Получить все хосты с источника + их шаблоны
print("Получаю хосты и шаблоны с источника...")
src_hosts = api_call(src, ZABBIX_SRC_URL, "host.get", {
"output": ["hostid", "host"],
"selectParentTemplates": ["templateid", "name"],
}, auth=src_token)
# Оставляем только хосты у которых есть шаблоны
src_hosts = [h for h in src_hosts if h.get("parentTemplates")]
print(f" ✓ Хостов с шаблонами: {len(src_hosts)}\n")
# 3. Получить все хосты на dst — маппинг host → hostid
print("Получаю хосты на назначении...")
dst_hosts = api_call(dst, ZABBIX_DST_URL, "host.get", {
"output": ["hostid", "host"],
"selectParentTemplates": ["templateid", "name"],
}, auth=dst_token)
dst_host_map = {h["host"]: h["hostid"] for h in dst_hosts}
print(f" ✓ Хостов на dst: {len(dst_hosts)}\n")
# 4. Получить все шаблоны на dst — маппинг name → templateid
print("Получаю шаблоны на назначении...")
dst_templates = api_call(dst, ZABBIX_DST_URL, "template.get", {
"output": ["templateid", "name"],
}, auth=dst_token)
dst_tpl_map = {t["name"]: t["templateid"] for t in dst_templates}
print(f" ✓ Шаблонов на dst: {len(dst_templates)}\n")
# 5. Восстановить привязки
ok, skip, fail = 0, 0, 0
failed = []
print(f"{'─'*60}")
for i, host in enumerate(src_hosts, start=1):
hostname = host["host"]
print(f"[{i}/{len(src_hosts)}] {hostname}")
# Хост должен существовать на dst
if hostname not in dst_host_map:
print(f" ⚠ Хост не найден на dst, пропускаем")
skip += 1
continue
dst_hostid = dst_host_map[hostname]
# Получить уже привязанные шаблоны на dst для этого хоста
dst_host_data = next((h for h in dst_hosts if h["host"] == hostname), None)
already_linked = {t["name"] for t in dst_host_data.get("parentTemplates", [])}
# Собрать шаблоны для привязки
templates_to_link = []
missing_templates = []
for tpl in host["parentTemplates"]:
name = tpl["name"]
if name in already_linked:
continue
if name in dst_tpl_map:
templates_to_link.append({"templateid": dst_tpl_map[name]})
else:
missing_templates.append(name)
if missing_templates:
for m in missing_templates:
print(f" ⚠ Шаблон не найден на dst: {m}")
if not templates_to_link:
print(f" ⚠ Все шаблоны уже привязаны или отсутствуют на dst")
skip += 1
continue
try:
api_call(dst, ZABBIX_DST_URL, "host.update", {
"hostid": dst_hostid,
"templates": templates_to_link,
}, auth=dst_token)
names = [t["name"] for t in host["parentTemplates"]
if t["name"] in dst_tpl_map]
print(f" ✓ Привязано шаблонов: {len(templates_to_link)} ({', '.join(names)})")
ok += 1
except RuntimeError as e:
print(f" ✗ Ошибка: {e}")
failed.append((hostname, str(e)))
fail += 1
# 6. Логаут
logout(src, ZABBIX_SRC_URL, src_token)
logout(dst, ZABBIX_DST_URL, dst_token)
# 7. Итог
print(f"\n{'='*60}")
print(f" ✓ Обновлено: {ok}")
print(f" ⚠ Пропущено: {skip}")
print(f" ✗ Ошибок: {fail}")
print(f" Всего: {len(src_hosts)}")
if failed:
print(f"\nХосты с ошибками:")
for name, err in failed:
print(f" - {name}: {err}")
#!/usr/bin/env python3
import os
import json
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# === НАСТРОЙКИ ===
ZABBIX_SRC_URL = "https://OLD_ZABBIX/api_jsonrpc.php"
ZABBIX_DST_URL = "https://NEW_ZABBIX/api_jsonrpc.php"
ZABBIX_SRC_USER = "Admin"
ZABBIX_SRC_PASS = "zabbix"
ZABBIX_DST_USER = "Admin"
ZABBIX_DST_PASS = "zabbix"
VERIFY_SSL = False
# ── helpers ──────────────────────────────────────────────────────────────────
def make_session():
s = requests.Session()
s.verify = VERIFY_SSL
return s
def api_call(session, url, method, params, auth=None):
headers = {"Content-Type": "application/json"}
if auth:
headers["Authorization"] = f"Bearer {auth}"
resp = session.post(url, json={
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": 1,
}, headers=headers)
resp.raise_for_status()
data = resp.json()
if "error" in data:
raise RuntimeError(f"API error [{method}]: {data['error']}")
return data["result"]
def login(session, url, user, password):
try:
return api_call(session, url, "user.login",
{"username": user, "password": password})
except RuntimeError:
return api_call(session, url, "user.login",
{"user": user, "password": password})
def logout(session, url, token):
try:
api_call(session, url, "user.logout", [], auth=token)
except Exception:
pass
# ── main ──────────────────────────────────────────────────────────────────────
src = make_session()
dst = make_session()
# 1. Авторизация
print("Авторизация на источнике...")
src_token = login(src, ZABBIX_SRC_URL, ZABBIX_SRC_USER, ZABBIX_SRC_PASS)
print(f" ✓ {ZABBIX_SRC_URL}")
print("Авторизация на назначении...")
dst_token = login(dst, ZABBIX_DST_URL, ZABBIX_DST_USER, ZABBIX_DST_PASS)
print(f" ✓ {ZABBIX_DST_URL}\n")
# 2. Получить все host groups с источника
print("Получаю host groups с источника...")
src_groups = api_call(src, ZABBIX_SRC_URL, "hostgroup.get",
{"output": ["groupid", "name"]},
auth=src_token)
print(f" Найдено групп: {len(src_groups)}\n")
# 3. Получить уже существующие группы на назначении
existing = api_call(dst, ZABBIX_DST_URL, "hostgroup.get",
{"output": ["name"]},
auth=dst_token)
existing_names = {g["name"] for g in existing}
# 4. Создать недостающие группы
ok, skip, fail = 0, 0, 0
failed = []
print(f"{'─'*55}")
for i, group in enumerate(src_groups, start=1):
name = group["name"]
print(f"[{i}/{len(src_groups)}] {name}")
if name in existing_names:
print(f" ⚠ Уже существует, пропускаем")
skip += 1
continue
try:
result = api_call(dst, ZABBIX_DST_URL, "hostgroup.create",
{"name": name},
auth=dst_token)
print(f" ✓ Создана (id={result['groupids'][0]})")
ok += 1
except RuntimeError as e:
print(f" ✗ Ошибка: {e}")
failed.append((name, str(e)))
fail += 1
# 5. Логаут
logout(src, ZABBIX_SRC_URL, src_token)
logout(dst, ZABBIX_DST_URL, dst_token)
# 6. Итог
print(f"\n{'='*55}")
print(f" ✓ Создано: {ok}")
print(f" ⚠ Пропущено: {skip}")
print(f" ✗ Ошибок: {fail}")
print(f" Всего: {len(src_groups)}")
if failed:
print(f"\nГруппы с ошибками:")
for name, err in failed:
print(f" - {name}: {err}")
#!/usr/bin/env python3
import os
import json
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# === НАСТРОЙКИ ===
ZABBIX_SRC_URL = "https://OLD_ZABBIX/api_jsonrpc.php"
ZABBIX_DST_URL = "https://NEW_ZABBIX/api_jsonrpc.php"
ZABBIX_SRC_USER = "Admin"
ZABBIX_SRC_PASS = "zabbix"
ZABBIX_DST_USER = "Admin"
ZABBIX_DST_PASS = "zabbix"
VERIFY_SSL = False
# ── helpers ───────────────────────────────────────────────────────────────────
def make_session():
s = requests.Session()
s.verify = VERIFY_SSL
return s
def api_call(session, url, method, params, auth=None):
headers = {"Content-Type": "application/json"}
if auth:
headers["Authorization"] = f"Bearer {auth}"
resp = session.post(url, json={
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": 1,
}, headers=headers)
resp.raise_for_status()
data = resp.json()
if "error" in data:
raise RuntimeError(f"API error [{method}]: {data['error']}")
return data["result"]
def login(session, url, user, password):
try:
return api_call(session, url, "user.login",
{"username": user, "password": password})
except RuntimeError:
return api_call(session, url, "user.login",
{"user": user, "password": password})
def logout(session, url, token):
try:
api_call(session, url, "user.logout", [], auth=token)
except Exception:
pass
# ── main ──────────────────────────────────────────────────────────────────────
src = make_session()
dst = make_session()
# 1. Авторизация
print("Авторизация...")
src_token = login(src, ZABBIX_SRC_URL, ZABBIX_SRC_USER, ZABBIX_SRC_PASS)
print(f" ✓ Источник: {ZABBIX_SRC_URL}")
dst_token = login(dst, ZABBIX_DST_URL, ZABBIX_DST_USER, ZABBIX_DST_PASS)
print(f" ✓ Назначение: {ZABBIX_DST_URL}\n")
# 2. Получить все host groups с источника и создать маппинг name→id на dst
print("Синхронизирую host groups...")
src_groups = api_call(src, ZABBIX_SRC_URL, "hostgroup.get",
{"output": ["groupid", "name"]}, auth=src_token)
dst_groups = api_call(dst, ZABBIX_DST_URL, "hostgroup.get",
{"output": ["groupid", "name"]}, auth=dst_token)
dst_group_map = {g["name"]: g["groupid"] for g in dst_groups}
# Создаём недостающие группы
for g in src_groups:
if g["name"] not in dst_group_map:
result = api_call(dst, ZABBIX_DST_URL, "hostgroup.create",
{"name": g["name"]}, auth=dst_token)
dst_group_map[g["name"]] = result["groupids"][0]
print(f" + Группа создана: {g['name']}")
# Маппинг src groupid → dst groupid
src_group_map = {g["groupid"]: dst_group_map[g["name"]] for g in src_groups}
print(f" ✓ Групп всего: {len(src_groups)}\n")
# 3. Получить все шаблоны на dst — для маппинга по имени
print("Получаю шаблоны на назначении...")
dst_templates = api_call(dst, ZABBIX_DST_URL, "template.get",
{"output": ["templateid", "name"]}, auth=dst_token)
dst_tpl_map = {t["name"]: t["templateid"] for t in dst_templates}
print(f" ✓ Шаблонов на dst: {len(dst_templates)}\n")
# 4. Получить все хосты с источника со всеми нужными полями
print("Получаю хосты с источника...")
src_hosts = api_call(src, ZABBIX_SRC_URL, "host.get", {
"output": [
"hostid", "host", "name", "status",
"description", "ipmi_authtype", "ipmi_privilege",
"ipmi_username", "ipmi_password",
"tls_connect", "tls_accept",
"tls_issuer", "tls_subject",
"tls_psk_identity", "tls_psk",
],
"selectGroups": ["groupid", "name"],
"selectInterfaces": ["type", "main", "useip", "ip", "dns", "port", "details"],
"selectTemplates": ["templateid", "name"],
"selectMacros": ["macro", "value", "description", "type"],
"selectTags": ["tag", "value"],
"selectInventory": "extend",
}, auth=src_token)
print(f" ✓ Хостов найдено: {len(src_hosts)}\n")
# 5. Получить уже существующие хосты на dst
dst_hosts = api_call(dst, ZABBIX_DST_URL, "host.get",
{"output": ["host"]}, auth=dst_token)
dst_host_set = {h["host"] for h in dst_hosts}
# 6. Создать хосты
ok, skip, fail = 0, 0, 0
failed = []
print(f"{'─'*60}")
for i, host in enumerate(src_hosts, start=1):
hostname = host["host"]
print(f"[{i}/{len(src_hosts)}] {hostname} ({host['name']})")
if hostname in dst_host_set:
print(f" ⚠ Уже существует, пропускаем")
skip += 1
continue
try:
# Группы — переводим src groupid → dst groupid
groups = [
{"groupid": src_group_map[g["groupid"]]}
for g in host["groups"]
if g["groupid"] in src_group_map
]
if not groups:
raise ValueError("Нет подходящих групп на dst")
# Шаблоны — ищем по имени на dst
templates = []
for t in host.get("templates", []):
if t["name"] in dst_tpl_map:
templates.append({"templateid": dst_tpl_map[t["name"]]})
else:
print(f" ⚠ Шаблон не найден на dst: {t['name']}")
# Интерфейсы
interfaces = []
for iface in host.get("interfaces", []):
iface_obj = {
"type": iface["type"],
"main": iface["main"],
"useip": iface["useip"],
"ip": iface["ip"],
"dns": iface["dns"],
"port": iface["port"],
}
# SNMP details (type=2)
if iface.get("details"):
iface_obj["details"] = iface["details"]
interfaces.append(iface_obj)
# Инвентарь
inventory_mode = -1 # disabled
inventory = {}
if host.get("inventory"):
inventory_mode = 1
# убираем служебные поля
skip_fields = {"hostid", "inventory_mode"}
inventory = {k: v for k, v in host["inventory"].items()
if k not in skip_fields and v}
# TLS — если PSK но ключ пустой, сбрасываем на no encryption
tls_connect = host.get("tls_connect", "1")
tls_accept = host.get("tls_accept", "1")
psk_identity = host.get("tls_psk_identity", "")
psk = host.get("tls_psk", "")
PSK_FLAG = 2 # bit/value для PSK в Zabbix
if not psk_identity or not psk:
# PSK данные недоступны — откатываемся на no encryption
if str(tls_connect) == str(PSK_FLAG):
tls_connect = "1"
# tls_accept — битовая маска: убираем бит PSK (2)
try:
tls_accept_int = int(tls_accept) & ~PSK_FLAG
tls_accept = str(tls_accept_int) if tls_accept_int > 0 else "1"
except ValueError:
tls_accept = "1"
psk_identity = ""
psk = ""
params = {
"host": hostname,
"name": host["name"],
"status": host["status"],
"description": host.get("description", ""),
"groups": groups,
"interfaces": interfaces,
"templates": templates,
"macros": host.get("macros", []),
"tags": host.get("tags", []),
"inventory_mode": inventory_mode,
"inventory": inventory,
"tls_connect": tls_connect,
"tls_accept": tls_accept,
"tls_issuer": host.get("tls_issuer", ""),
"tls_subject": host.get("tls_subject", ""),
"ipmi_authtype": host.get("ipmi_authtype", -1),
"ipmi_privilege": host.get("ipmi_privilege", 2),
"ipmi_username": host.get("ipmi_username", ""),
"ipmi_password": host.get("ipmi_password", ""),
}
# Добавляем PSK только если данные есть
if psk_identity and psk:
params["tls_psk_identity"] = psk_identity
params["tls_psk"] = psk
result = api_call(dst, ZABBIX_DST_URL, "host.create",
params, auth=dst_token)
print(f" ✓ Создан (id={result['hostids'][0]})")
ok += 1
except (RuntimeError, ValueError) as e:
print(f" ✗ Ошибка: {e}")
failed.append((hostname, str(e)))
fail += 1
# 7. Логаут
logout(src, ZABBIX_SRC_URL, src_token)
logout(dst, ZABBIX_DST_URL, dst_token)
# 8. Итог
print(f"\n{'='*60}")
print(f" ✓ Создано: {ok}")
print(f" ⚠ Пропущено: {skip}")
print(f" ✗ Ошибок: {fail}")
print(f" Всего: {len(src_hosts)}")
if failed:
print(f"\nХосты с ошибками:")
for name, err in failed:
print(f" - {name}: {err}")
#!/usr/bin/env python3
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# === НАСТРОЙКИ ===
ZABBIX_SRC_URL = "https://OLD_ZABBIX/api_jsonrpc.php"
ZABBIX_DST_URL = "https://NEW_ZABBIX/api_jsonrpc.php"
ZABBIX_SRC_USER = "Admin"
ZABBIX_SRC_PASS = "zabbix"
ZABBIX_DST_USER = "Admin"
ZABBIX_DST_PASS = "zabbix"
VERIFY_SSL = False
# ── helpers ───────────────────────────────────────────────────────────────────
def make_session():
s = requests.Session()
s.verify = VERIFY_SSL
return s
def api_call(session, url, method, params, auth=None):
headers = {"Content-Type": "application/json"}
if auth:
headers["Authorization"] = f"Bearer {auth}"
resp = session.post(url, json={
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": 1,
}, headers=headers)
resp.raise_for_status()
data = resp.json()
if "error" in data:
raise RuntimeError(f"API error [{method}]: {data['error']}")
return data["result"]
def login(session, url, user, password):
try:
return api_call(session, url, "user.login",
{"username": user, "password": password})
except RuntimeError:
return api_call(session, url, "user.login",
{"user": user, "password": password})
def logout(session, url, token):
try:
api_call(session, url, "user.logout", [], auth=token)
except Exception:
pass
# ── main ──────────────────────────────────────────────────────────────────────
src = make_session()
dst = make_session()
# 1. Авторизация
print("Авторизация...")
src_token = login(src, ZABBIX_SRC_URL, ZABBIX_SRC_USER, ZABBIX_SRC_PASS)
dst_token = login(dst, ZABBIX_DST_URL, ZABBIX_DST_USER, ZABBIX_DST_PASS)
print(f" ✓ Источник: {ZABBIX_SRC_URL}")
print(f" ✓ Назначение: {ZABBIX_DST_URL}\n")
# 2. Получить все прокси с источника
print("Получаю прокси с источника...")
try:
# Zabbix 6.4+
src_proxies = api_call(src, ZABBIX_SRC_URL, "proxy.get", {
"output": [
"proxyid", "name", "operating_mode",
"tls_connect", "tls_accept",
"tls_issuer", "tls_subject",
"description",
],
}, auth=src_token)
except RuntimeError:
# Zabbix < 6.4
src_proxies = api_call(src, ZABBIX_SRC_URL, "proxy.get", {
"output": [
"proxyid", "host", "status",
"tls_connect", "tls_accept",
"tls_issuer", "tls_subject",
"proxy_address", "description",
],
}, auth=src_token)
# Приводим к единому формату
for p in src_proxies:
p["name"] = p.pop("host", "")
p["operating_mode"] = "0" if str(p.pop("status", "5")) == "5" else "1"
print(f" ✓ Прокси найдено: {len(src_proxies)}\n")
# 3. Получить существующие прокси на dst
print("Получаю прокси на назначении...")
try:
dst_proxies = api_call(dst, ZABBIX_DST_URL, "proxy.get", {
"output": ["proxyid", "name"],
}, auth=dst_token)
except RuntimeError:
dst_proxies = api_call(dst, ZABBIX_DST_URL, "proxy.get", {
"output": ["proxyid", "host"],
}, auth=dst_token)
for p in dst_proxies:
p["name"] = p.pop("host", "")
dst_proxy_map = {p["name"]: p["proxyid"] for p in dst_proxies}
print(f" ✓ Прокси на dst: {len(dst_proxies)}\n")
# 4. Создать/обновить прокси
ok, skip, fail = 0, 0, 0
failed = []
# Маппинг src proxyid → dst proxyid (нужен для привязки хостов)
proxy_id_map = {}
print(f"{'─'*60}")
for i, proxy in enumerate(src_proxies, start=1):
name = proxy["name"]
print(f"[{i}/{len(src_proxies)}] {name}")
# TLS PSK
psk_identity = proxy.get("tls_psk_identity", "")
psk = proxy.get("tls_psk", "")
tls_connect = proxy.get("tls_connect", "1")
tls_accept = proxy.get("tls_accept", "1")
PSK_FLAG = 2
if not psk_identity or not psk:
if str(tls_connect) == str(PSK_FLAG):
tls_connect = "1"
try:
tls_accept_int = int(tls_accept) & ~PSK_FLAG
tls_accept = str(tls_accept_int) if tls_accept_int > 0 else "1"
except ValueError:
tls_accept = "1"
psk_identity = ""
psk = ""
params = {
"name": name,
"operating_mode": proxy.get("operating_mode", "0"),
"description": proxy.get("description", ""),
"tls_connect": tls_connect,
"tls_accept": tls_accept,
"tls_issuer": proxy.get("tls_issuer", ""),
"tls_subject": proxy.get("tls_subject", ""),
}
if psk_identity and psk:
params["tls_psk_identity"] = psk_identity
params["tls_psk"] = psk
try:
if name in dst_proxy_map:
# Обновляем существующий
dst_proxyid = dst_proxy_map[name]
params["proxyid"] = dst_proxyid
api_call(dst, ZABBIX_DST_URL, "proxy.update",
params, auth=dst_token)
proxy_id_map[proxy["proxyid"]] = dst_proxyid
print(f" ↻ Обновлён (id={dst_proxyid})")
skip += 1
else:
# Создаём новый
result = api_call(dst, ZABBIX_DST_URL, "proxy.create",
params, auth=dst_token)
new_id = result["proxyids"][0]
proxy_id_map[proxy["proxyid"]] = new_id
dst_proxy_map[name] = new_id
print(f" ✓ Создан (id={new_id})")
ok += 1
except RuntimeError as e:
print(f" ✗ Ошибка: {e}")
failed.append((name, str(e)))
fail += 1
# 5. Привязать хосты к прокси
print(f"\n{'─'*60}")
print("Привязываю хосты к прокси...")
src_hosts = api_call(src, ZABBIX_SRC_URL, "host.get", {
"output": ["hostid", "host"],
"selectInterfaces": "count",
"proxyids": list(proxy_id_map.keys()),
}, auth=src_token)
# Получить proxyid для каждого хоста на источнике
src_hosts_full = api_call(src, ZABBIX_SRC_URL, "host.get", {
"output": ["hostid", "host", "proxyid"],
"hostids": [h["hostid"] for h in src_hosts],
}, auth=src_token)
# Получить хосты на dst
dst_hosts = api_call(dst, ZABBIX_DST_URL, "host.get", {
"output": ["hostid", "host"],
}, auth=dst_token)
dst_host_map = {h["host"]: h["hostid"] for h in dst_hosts}
host_ok, host_skip, host_fail = 0, 0, 0
for host in src_hosts_full:
hostname = host["host"]
src_proxyid = host.get("proxyid", "0")
if src_proxyid not in proxy_id_map:
continue
if hostname not in dst_host_map:
print(f" ⚠ Хост не найден на dst: {hostname}")
host_skip += 1
continue
dst_proxyid = proxy_id_map[src_proxyid]
dst_hostid = dst_host_map[hostname]
try:
# Zabbix 6.4+ использует proxyid, старее — proxy_hostid
try:
api_call(dst, ZABBIX_DST_URL, "host.update", {
"hostid": dst_hostid,
"proxyid": dst_proxyid,
}, auth=dst_token)
except RuntimeError:
api_call(dst, ZABBIX_DST_URL, "host.update", {
"hostid": dst_hostid,
"proxy_hostid": dst_proxyid,
}, auth=dst_token)
print(f" ✓ {hostname} → proxy id={dst_proxyid}")
host_ok += 1
except RuntimeError as e:
print(f" ✗ {hostname}: {e}")
host_fail += 1
# 6. Логаут
logout(src, ZABBIX_SRC_URL, src_token)
logout(dst, ZABBIX_DST_URL, dst_token)
# 7. Итог
print(f"\n{'='*60}")
print(f"Прокси:")
print(f" ✓ Создано: {ok}")
print(f" ↻ Обновлено: {skip}")
print(f" ✗ Ошибок: {fail}")
print(f"\nПривязка хостов к прокси:")
print(f" ✓ Привязано: {host_ok}")
print(f" ⚠ Пропущено: {host_skip}")
print(f" ✗ Ошибок: {host_fail}")
if failed:
print(f"\nПрокси с ошибками:")
for name, err in failed:
print(f" - {name}: {err}")
#!/usr/bin/env python3
import os
import re
import json
import requests
import urllib3
# Отключаем предупреждение если self-signed сертификат
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# === НАСТРОЙКИ ===
ZABBIX_URL = "https://zabbix.examples.com/api_jsonrpc.php"
ZABBIX_USER = "admin"
ZABBIX_PASS = "password"
OUTPUT_DIR = "./templates"
FORMAT = "yaml" # xml или yaml (yaml — Zabbix 6+)
VERIFY_SSL = False # True если есть валидный сертификат
os.makedirs(OUTPUT_DIR, exist_ok=True)
session = requests.Session()
session.verify = VERIFY_SSL
_id = 1
def api_call(method, params, auth=None):
global _id
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": _id,
}
if auth:
payload["auth"] = auth
resp = session.post(ZABBIX_URL,
json=payload,
headers={"Content-Type": "application/json"})
resp.raise_for_status()
_id += 1
data = resp.json()
if "error" in data:
raise RuntimeError(f"API error [{method}]: {data['error']}")
return data["result"]
def safe_filename(name):
return re.sub(r'[ /\\:*?"<>|]', "_", name)
# === 1. АВТОРИЗАЦИЯ ===
print("Авторизация...")
try:
# Zabbix 5.4+
token = api_call("user.login", {"username": ZABBIX_USER, "password": ZABBIX_PASS})
except RuntimeError:
# Zabbix 5.2 и ниже
token = api_call("user.login", {"user": ZABBIX_USER, "password": ZABBIX_PASS})
print(f"Токен получен: {token}\n")
# === 2. ПОЛУЧИТЬ ВСЕ ШАБЛОНЫ ===
print("Получаю список шаблонов...")
templates = api_call(
"template.get",
{"output": ["templateid", "name"]},
auth=token
)
print(f"Найдено шаблонов: {len(templates)}\n")
# === 3. ЭКСПОРТИРОВАТЬ КАЖДЫЙ ШАБЛОН ===
ok, fail = 0, 0
for i, tpl in enumerate(templates, start=1):
tid = tpl["templateid"]
name = tpl["name"]
safe = safe_filename(name)
outfile = os.path.join(OUTPUT_DIR, f"{safe}__id{tid}.{FORMAT}")
print(f"[{i}/{len(templates)}] {name} (id={tid})")
try:
content = api_call(
"configuration.export",
{"options": {"templates": [tid]}, "format": FORMAT},
auth=token
)
with open(outfile, "w", encoding="utf-8") as f:
f.write(content)
size = os.path.getsize(outfile)
print(f" ✓ Сохранён: {outfile} ({size} байт)")
ok += 1
except Exception as e:
print(f" ✗ Ошибка: {e}")
fail += 1
# === 4. ЛОГАУТ ===
try:
api_call("user.logout", [], auth=token)
except Exception:
pass
# === ИТОГ ===
print(f"\n{'='*50}")
print(f"Готово! Директория: {OUTPUT_DIR}")
print(f" ✓ Успешно: {ok}")
print(f" ✗ Ошибок: {fail}")
print(f" Всего: {len(templates)}")
#!/usr/bin/env python3
import os
import re
import json
import requests
import urllib3
# Отключаем предупреждение если self-signed сертификат
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# === НАСТРОЙКИ ===
ZABBIX_URL = "https://zabbix.examples.com/api_jsonrpc.php"
ZABBIX_USER = "admin"
ZABBIX_PASS = "password"
INPUT_DIR = "./templates"
FORMAT = "yaml" # xml или yaml (yaml — Zabbix 6+)
VERIFY_SSL = False
session = requests.Session()
session.verify = VERIFY_SSL
_id = 1
def api_call(method, params, auth=None):
global _id
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": _id,
}
headers = {"Content-Type": "application/json"}
# Zabbix 6.4+ — токен идёт в заголовке, не в теле
if auth:
headers["Authorization"] = f"Bearer {auth}"
resp = session.post(ZABBIX_URL, json=payload, headers=headers)
resp.raise_for_status()
_id += 1
data = resp.json()
if "error" in data:
raise RuntimeError(f"API error [{method}]: {data['error']}")
return data["result"]
# === 1. АВТОРИЗАЦИЯ ===
print("Авторизация...")
try:
token = api_call("user.login", {"username": ZABBIX_USER, "password": ZABBIX_PASS})
except RuntimeError:
token = api_call("user.login", {"user": ZABBIX_USER, "password": ZABBIX_PASS})
print(f"Токен получен: {token}\n")
# === 2. СОБРАТЬ ФАЙЛЫ ДЛЯ ИМПОРТА ===
files = sorted([
f for f in os.listdir(INPUT_DIR)
if f.endswith(f".{FORMAT}")
])
if not files:
print(f"Нет файлов .{FORMAT} в директории: {INPUT_DIR}")
exit(1)
print(f"Найдено файлов для импорта: {len(files)}\n")
# === 3. ИМПОРТИРОВАТЬ КАЖДЫЙ ШАБЛОН ===
ok, fail, skip = 0, 0, 0
failed_files = []
rules = {
"templates": {
"createMissing": True,
"updateExisting": True,
},
"items": {
"createMissing": True,
"updateExisting": True,
"deleteMissing": False,
},
"triggers": {
"createMissing": True,
"updateExisting": True,
"deleteMissing": False,
},
"graphs": {
"createMissing": True,
"updateExisting": True,
"deleteMissing": False,
},
"discoveryRules": {
"createMissing": True,
"updateExisting": True,
"deleteMissing": False,
},
"templateLinkage": {
"createMissing": True,
},
"valueMaps": {
"createMissing": True,
"updateExisting": True,
},
"httptests": {
"createMissing": True,
"updateExisting": True,
"deleteMissing": False,
},
}
for i, filename in enumerate(files, start=1):
filepath = os.path.join(INPUT_DIR, filename)
print(f"[{i}/{len(files)}] {filename}")
try:
with open(filepath, "r", encoding="utf-8") as f:
content = f.read()
if not content.strip():
print(f" ⚠ Пустой файл, пропускаем")
skip += 1
continue
api_call(
"configuration.import",
{
"format": FORMAT,
"rules": rules,
"source": content,
},
auth=token
)
print(f" ✓ Импортирован")
ok += 1
except RuntimeError as e:
print(f" ✗ Ошибка API: {e}")
failed_files.append((filename, str(e)))
fail += 1
except Exception as e:
print(f" ✗ Ошибка: {e}")
failed_files.append((filename, str(e)))
fail += 1
# === 4. ЛОГАУТ ===
try:
api_call("user.logout", [], auth=token)
except Exception:
pass
# === ИТОГ ===
print(f"\n{'='*50}")
print(f"Директория: {INPUT_DIR}")
print(f" ✓ Успешно: {ok}")
print(f" ⚠ Пропущено: {skip}")
print(f" ✗ Ошибок: {fail}")
print(f" Всего: {len(files)}")
if failed_files:
print(f"\nФайлы с ошибками:")
for fname, err in failed_files:
print(f" - {fname}")
print(f" {err}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment