Skip to content

Instantly share code, notes, and snippets.

@rooty
Last active January 28, 2026 16:18
Show Gist options
  • Select an option

  • Save rooty/d381a13777012ce12cac287a3bc614d9 to your computer and use it in GitHub Desktop.

Select an option

Save rooty/d381a13777012ce12cac287a3bc614d9 to your computer and use it in GitHub Desktop.
php fpm check script (port and socket version )
{
"host": "/run/php/php8.2-fpm.sock",
"port": 0,
"path": "/php82-www",
"interval": 10,
"failcount": 3,
"treshold": 5,
"restart_command": "systemctl restart php8.2-fpm.service",
"holdoff": 30,
"timeout": 5,
"loglevel": "INFO"
}
#!/usr/bin/env python3
import sys
import json
import time
import socket
import os
import random
import logging
import signal
class FastCGIClient:
"""A Fast-CGI Client for Python 3 supporting TCP and Unix Sockets"""
__FCGI_VERSION = 1
__FCGI_ROLE_RESPONDER = 1
__FCGI_TYPE_BEGIN = 1
__FCGI_TYPE_PARAMS = 4
__FCGI_TYPE_STDIN = 5
__FCGI_TYPE_STDOUT = 6
__FCGI_TYPE_STDERR = 7
__FCGI_TYPE_END = 3
__FCGI_HEADER_SIZE = 8
def __init__(self, host, port, timeout, keepalive):
self.host = str(host)
self.port = port
self.timeout = timeout
self.keepalive = 1 if keepalive else 0
self.sock = None
self.requests = {}
def _is_unix_socket(self):
return self.host.startswith('/') or self.host.startswith('./')
def __connect(self):
if self.sock:
return
if self._is_unix_socket():
if not os.path.exists(self.host):
raise FileNotFoundError(f"Unix socket not found: {self.host}")
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
address = self.host
else:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
address = (self.host, int(self.port))
self.sock.settimeout(self.timeout)
try:
self.sock.connect(address)
except socket.error as msg:
self.close()
raise msg
def close(self):
"""Корректное закрытие сокета"""
if self.sock:
try:
self.sock.close()
except:
pass
self.sock = None
def __encodeFastCGIRecord(self, fcgi_type, content, requestid):
if isinstance(content, str):
content = content.encode('utf-8')
length = len(content)
header = bytes([
self.__FCGI_VERSION, fcgi_type,
(requestid >> 8) & 0xFF, requestid & 0xFF,
(length >> 8) & 0xFF, length & 0xFF,
0, 0
])
return header + content
def __encodeNameValueParams(self, name, value):
name = str(name).encode('utf-8')
value = str(value).encode('utf-8')
record = bytearray()
for length in [len(name), len(value)]:
if length < 128:
record.append(length)
else:
record.extend([
(length >> 24) | 0x80, (length >> 16) & 0xFF,
(length >> 8) & 0xFF, length & 0xFF
])
return record + name + value
def __decodeFastCGIRecord(self):
try:
header_data = self.sock.recv(self.__FCGI_HEADER_SIZE)
if not header_data: return False
v, t, rId_h, rId_l, cLen_h, cLen_l, pLen, _ = header_data
cLen = (cLen_h << 8) + cLen_l
rId = (rId_h << 8) + rId_l
content = bytearray()
while cLen > 0:
chunk = self.sock.recv(cLen)
if not chunk: break
content.extend(chunk)
cLen -= len(chunk)
if pLen > 0: self.sock.recv(pLen)
return {'type': t, 'requestId': rId, 'content': content}
except Exception:
return False
def request(self, nameValuePairs=None, post=''):
self.__connect()
requestId = random.randint(1, 65535)
self.requests[requestId] = {'response': bytearray()}
try:
# Begin
begin_content = bytes([0, self.__FCGI_ROLE_RESPONDER, self.keepalive] + [0]*5)
self.sock.sendall(self.__encodeFastCGIRecord(self.__FCGI_TYPE_BEGIN, begin_content, requestId))
# Params
if nameValuePairs:
params_data = bytearray()
for name, value in nameValuePairs.items():
params_data.extend(self.__encodeNameValueParams(name, value))
self.sock.sendall(self.__encodeFastCGIRecord(self.__FCGI_TYPE_PARAMS, params_data, requestId))
self.sock.sendall(self.__encodeFastCGIRecord(self.__FCGI_TYPE_PARAMS, b'', requestId))
# Stdin
if post:
self.sock.sendall(self.__encodeFastCGIRecord(self.__FCGI_TYPE_STDIN, post.encode(), requestId))
self.sock.sendall(self.__encodeFastCGIRecord(self.__FCGI_TYPE_STDIN, b'', requestId))
return self.__waitForResponse(requestId)
finally:
# Закрываем соединение после каждого запроса, если не keepalive
if not self.keepalive:
self.close()
def __waitForResponse(self, requestId):
while True:
record = self.__decodeFastCGIRecord()
if not record: break
if record['type'] in (self.__FCGI_TYPE_STDOUT, self.__FCGI_TYPE_STDERR):
if record['requestId'] == requestId:
self.requests[requestId]['response'].extend(record['content'])
if record['type'] == self.__FCGI_TYPE_END:
break
return self.requests[requestId]['response'].decode('utf-8', errors='replace')
def usage():
print(f"Usage: {sys.argv[0]} <config.json>", file=sys.stderr)
sys.exit(2)
def main():
if len(sys.argv) != 2:
usage()
# Флаг для управления циклом
running = True
def signal_handler(sig, frame):
nonlocal running
# Используем print, так как logging может быть уже не доступен или небезопасен в обработчике
print(f"\nЗавершение работы (сигнал {sig})...")
running = False
# Регистрация сигналов для systemd
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
with open(sys.argv[1]) as cf:
cfg = json.load(cf)
except Exception as e:
print(f"Error loading config: {e}", file=sys.stderr)
sys.exit(1)
#logging.basicConfig(
# format='%(asctime)s %(levelname)-8s %(message)s',
# level=cfg.get('loglevel', logging.INFO),
# datefmt='%Y-%m-%d %H:%M:%S'
#)
# В функции main() достаточно такой настройки:
logging.basicConfig(
format='%(levelname)-8s %(message)s', # Время systemd добавит сам
level=cfg.get('loglevel', logging.INFO),
handlers=[logging.StreamHandler(sys.stdout)]
)
client = FastCGIClient(cfg['host'], cfg['port'], cfg['timeout'], 0)
params = {
'GATEWAY_INTERFACE': 'FastCGI/1.0',
'REQUEST_METHOD': 'GET',
'SCRIPT_FILENAME': cfg['path'],
'SCRIPT_NAME': cfg['path'],
'QUERY_STRING': 'json',
'REQUEST_URI': cfg['path'],
'SERVER_SOFTWARE': 'python/fcgimonitor',
'REMOTE_ADDR': '127.0.0.1',
'SERVER_PROTOCOL': 'HTTP/1.1'
}
fails = 0
logging.warning(f"Started monitoring {cfg['host']}")
# Основной цикл теперь зависит от флага running
while running:
try:
raw_res = client.request(params)
body = raw_res.partition('\r\n\r\n')[2]
if not body:
raise ValueError("Empty response body from FastCGI")
r = json.loads(body)
pm = r.get('process manager', 'unknown')
is_ok = False
if pm == "static":
is_ok = r['active processes'] <= r['total processes'] * cfg['treshold']
elif pm == "dynamic":
is_ok = not (r['idle processes'] == 0 and r['listen queue'] > 0)
if is_ok:
fails = 0
logging.info(f"Status OK (PM: {pm})")
else:
fails += 1
logging.warning(f"Status BAD (PM: {pm}, Fails: {fails})")
except Exception as e:
if running: # Не логируем ошибку, если мы уже в процессе выхода
logging.error(f"Check failed: {e}")
fails += 1
# Проверка порога ошибок
if fails >= cfg['failcount'] and running:
logging.critical(f"Host down! Executing: {cfg['restart_command']}")
os.system(cfg['restart_command'])
fails = 0
# Ожидание после рестарта
wait_end = time.time() + cfg['holdoff']
while time.time() < wait_end and running:
time.sleep(1)
else:
# Обычный интервал между проверками
wait_end = time.time() + cfg['interval']
while time.time() < wait_end and running:
time.sleep(1)
# Чистка ресурсов перед завершением
client.close()
logging.warning("Service stopped cleanly.")
if __name__ == '__main__':
main()
[php82-www]
user = www-data
group = www-data
listen = /run/php/php8.2-fpm.sock
listen.owner = www-data
listen.group = www-data
pm = static
pm.max_children = 16
pm.status_path = /$pool
[Unit]
Description=PHP82 Watch Dog
After=network.target
[Service]
#User=www-data
#Group=www-data
WorkingDirectory=/opt/php-watch-dog
# Путь к интерпретатору внутри venv и путь к самому скрипту
ExecStart=/usr/bin/python3 fastcgi_monitor.py config82.json
# Автоматический перезапуск при падении
Restart=always
RestartSec=5
# Переменные окружения (если нужны)
Environment=PYTHONUNBUFFERED=1
#Environment="SECRET_KEY=my_secret_value"
# Направляем стандартный вывод в файл
StandardOutput=append:/var/log/php82-watch-dog.log
# Направляем ошибки туда же
StandardError=inherit
# Если хотите, чтобы systemd сам ограничивал размер файла (доступно в новых версиях):
#LogMaxSize=50M
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment