Skip to content

Instantly share code, notes, and snippets.

@achetronic
Created October 31, 2025 14:46
Show Gist options
  • Select an option

  • Save achetronic/98f454d40c7ac4fbabb5f597d693ca0b to your computer and use it in GitHub Desktop.

Select an option

Save achetronic/98f454d40c7ac4fbabb5f597d693ca0b to your computer and use it in GitHub Desktop.
Super simple leader election in Kubernetes with PHP
<?php
class KubernetesLeaderElection
{
private string $apiServer;
private ?string $token = null;
private string $namespace;
private string $leaseName;
private string $identity;
private int $leaseDuration;
private ?string $caCert = null;
private ?string $clientCert = null;
private ?string $clientKey = null;
private bool $insecure;
public function __construct(
string $namespace,
string $leaseName,
int $leaseDuration = 15,
?string $identity = null,
?string $kubeconfig = null
) {
$this->namespace = $namespace;
$this->leaseName = $leaseName;
$this->identity = $identity ?? (gethostname() ?: uniqid('php-', true));
$this->leaseDuration = $leaseDuration;
if ($this->isInCluster() && !$kubeconfig) {
$this->configureInCluster();
} else {
$this->configureFromKubeconfig($kubeconfig ?? $this->getDefaultKubeconfigPath());
}
}
private function isInCluster(): bool
{
return file_exists('/var/run/secrets/kubernetes.io/serviceaccount/token');
}
private function configureInCluster(): void
{
$this->apiServer = 'https://' . getenv('KUBERNETES_SERVICE_HOST') . ':' . getenv('KUBERNETES_SERVICE_PORT');
$this->token = file_get_contents('/var/run/secrets/kubernetes.io/serviceaccount/token');
$this->caCert = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt';
$this->insecure = false;
echo "[CONFIG] In-cluster: {$this->apiServer}\n";
}
private function getDefaultKubeconfigPath(): string
{
$home = getenv('HOME') ?: getenv('USERPROFILE');
return $home . '/.kube/config';
}
private function configureFromKubeconfig(string $path): void
{
if (!file_exists($path)) {
throw new RuntimeException("Kubeconfig not found in: {$path}");
}
$config = yaml_parse_file($path);
if (!$config) {
throw new RuntimeException("Failed parsing kubeconfig");
}
$currentContext = $config['current-context'] ?? null;
if (!$currentContext) {
throw new RuntimeException("Context not found in kubeconfig");
}
$context = null;
foreach ($config['contexts'] as $ctx) {
if ($ctx['name'] === $currentContext) {
$context = $ctx['context'];
break;
}
}
if (!$context) {
throw new RuntimeException("Context '{$currentContext}' not found");
}
$clusterName = $context['cluster'];
$cluster = null;
foreach ($config['clusters'] as $cls) {
if ($cls['name'] === $clusterName) {
$cluster = $cls['cluster'];
break;
}
}
if (!$cluster) {
throw new RuntimeException("Cluster '{$clusterName}' not found");
}
$this->apiServer = $cluster['server'];
if (isset($cluster['certificate-authority-data'])) {
$caCertPath = sys_get_temp_dir() . '/k8s-ca-' . md5($cluster['certificate-authority-data']) . '.crt';
file_put_contents($caCertPath, base64_decode($cluster['certificate-authority-data']));
$this->caCert = $caCertPath;
$this->insecure = false;
} elseif (isset($cluster['certificate-authority'])) {
$this->caCert = $cluster['certificate-authority'];
$this->insecure = false;
} else {
$this->caCert = null;
$this->insecure = $cluster['insecure-skip-tls-verify'] ?? false;
}
$userName = $context['user'];
$user = null;
foreach ($config['users'] as $usr) {
if ($usr['name'] === $userName) {
$user = $usr['user'];
break;
}
}
if (!$user) {
throw new RuntimeException("User '{$userName}' not found");
}
if (isset($user['token'])) {
$this->token = $user['token'];
$authMethod = 'token';
} elseif (isset($user['client-certificate-data']) && isset($user['client-key-data'])) {
$this->clientCert = $this->writeTempFile(
base64_decode($user['client-certificate-data']),
'k8s-client-cert-' . md5($user['client-certificate-data']) . '.crt'
);
$this->clientKey = $this->writeTempFile(
base64_decode($user['client-key-data']),
'k8s-client-key-' . md5($user['client-key-data']) . '.key'
);
$authMethod = 'client-cert';
} elseif (isset($user['client-certificate']) && isset($user['client-key'])) {
$this->clientCert = $user['client-certificate'];
$this->clientKey = $user['client-key'];
$authMethod = 'client-cert';
} elseif (isset($user['exec'])) {
$this->token = $this->executeCredentialPlugin($user['exec']);
$authMethod = 'exec-plugin';
} else {
throw new RuntimeException("Valid authn method not found");
}
echo "[CONFIG] Kubeconfig: {$this->apiServer} (auth: {$authMethod}, insecure: " .
($this->insecure ? 'yes' : 'no') . ")\n";
}
private function writeTempFile(string $content, string $filename): string
{
$path = sys_get_temp_dir() . '/' . $filename;
file_put_contents($path, $content);
chmod($path, 0600);
return $path;
}
private function executeCredentialPlugin(array $exec): string
{
$command = $exec['command'];
$args = $exec['args'] ?? [];
$env = $exec['env'] ?? [];
$envVars = '';
foreach ($env as $envVar) {
$envVars .= "{$envVar['name']}={$envVar['value']} ";
}
$fullCommand = $envVars . escapeshellcmd($command) . ' ' . implode(' ', array_map('escapeshellarg', $args));
$output = shell_exec($fullCommand);
if (!$output) {
throw new RuntimeException("Failed executing credentials plugin");
}
$credentials = json_decode($output, true);
if (!isset($credentials['status']['token'])) {
throw new RuntimeException("Credentials plugin didn't retrieve a token");
}
return $credentials['status']['token'];
}
private function request(string $method, string $path, ?array $body = null): array
{
$ch = curl_init("{$this->apiServer}{$path}");
$contentType = ($method === 'PATCH')
? 'application/strategic-merge-patch+json'
: 'application/json';
$headers = ["Content-Type: {$contentType}"];
if ($this->token) {
$headers[] = 'Authorization: Bearer ' . $this->token;
}
$opts = [
CURLOPT_CUSTOMREQUEST => $method,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_HTTPHEADER => $headers,
];
if ($this->insecure) {
$opts[CURLOPT_SSL_VERIFYPEER] = false;
$opts[CURLOPT_SSL_VERIFYHOST] = false;
} elseif ($this->caCert) {
$opts[CURLOPT_CAINFO] = $this->caCert;
}
if ($this->clientCert && $this->clientKey) {
$opts[CURLOPT_SSLCERT] = $this->clientCert;
$opts[CURLOPT_SSLKEY] = $this->clientKey;
}
if ($body) {
$opts[CURLOPT_POSTFIELDS] = json_encode($body);
}
curl_setopt_array($ch, $opts);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
if (curl_errno($ch)) {
throw new RuntimeException("Curl error: " . curl_error($ch));
}
curl_close($ch);
return [
'code' => $httpCode,
'body' => $response ? json_decode($response, true) : null
];
}
public function tryAcquire(): bool
{
$now = (new DateTime('now', new DateTimeZone('UTC')))->format('Y-m-d\TH:i:s.u\Z');
$path = "/apis/coordination.k8s.io/v1/namespaces/{$this->namespace}/leases/{$this->leaseName}";
$result = $this->request('GET', $path);
if ($result['code'] === 404) {
return $this->createLease($now);
}
if ($result['code'] !== 200) {
return false;
}
$lease = $result['body'];
$holder = $lease['spec']['holderIdentity'] ?? null;
$renewTime = $lease['spec']['renewTime'] ?? null;
if ($holder === $this->identity || $this->isExpired($renewTime)) {
return $this->updateLease($now, $lease['metadata']['resourceVersion']);
}
return false;
}
private function createLease(string $now): bool
{
$body = [
'apiVersion' => 'coordination.k8s.io/v1',
'kind' => 'Lease',
'metadata' => ['name' => $this->leaseName],
'spec' => [
'holderIdentity' => $this->identity,
'leaseDurationSeconds' => $this->leaseDuration,
'acquireTime' => $now,
'renewTime' => $now,
]
];
$result = $this->request(
'POST',
"/apis/coordination.k8s.io/v1/namespaces/{$this->namespace}/leases",
$body
);
return $result['code'] === 201;
}
private function updateLease(string $now, string $resourceVersion): bool
{
$body = [
'spec' => [
'holderIdentity' => $this->identity,
'renewTime' => $now,
],
'metadata' => [
'resourceVersion' => $resourceVersion
]
];
$result = $this->request(
'PATCH',
"/apis/coordination.k8s.io/v1/namespaces/{$this->namespace}/leases/{$this->leaseName}",
$body
);
return in_array($result['code'], [200, 201]);
}
public function renew(): bool
{
$result = $this->request(
'GET',
"/apis/coordination.k8s.io/v1/namespaces/{$this->namespace}/leases/{$this->leaseName}"
);
if ($result['code'] !== 200) {
return false;
}
$lease = $result['body'];
if (($lease['spec']['holderIdentity'] ?? null) !== $this->identity) {
return false;
}
$now = (new DateTime('now', new DateTimeZone('UTC')))->format('Y-m-d\TH:i:s.u\Z');
return $this->updateLease($now, $lease['metadata']['resourceVersion']);
}
private function isExpired(?string $renewTime): bool
{
if (!$renewTime) return true;
$renewTimestamp = strtotime($renewTime);
return (time() - $renewTimestamp) > $this->leaseDuration;
}
/**
* Executes a child process when gets the leadership.
* Child process is killes automatically when leadership is lost.
*/
public function runAsLeaderWithProcess(callable $callback, int $renewInterval = 10): void
{
echo "[ELECTION] Starting with identity: {$this->identity}\n";
$childPid = null;
// Handler for cleaning child process on exit
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function() use (&$childPid) {
if ($childPid) {
posix_kill($childPid, SIGTERM);
pcntl_waitpid($childPid, $status);
}
exit(0);
});
while (true) {
if ($this->tryAcquire()) {
echo date('Y-m-d H:i:s') . " [LEADER] ✓ I'm the leader. Starting process...\n";
// Fork: create child process
$childPid = pcntl_fork();
if ($childPid === -1) {
throw new RuntimeException("Failed to fork process");
}
if ($childPid === 0) {
// Child process: execute the callback
try {
$callback();
} catch (Exception $e) {
echo "[CHILD] Error: " . $e->getMessage() . "\n";
exit(1);
}
exit(0);
}
// Parent process: Renew the lease
while ($this->renew()) {
sleep($renewInterval);
// Verify if child is alive
$result = pcntl_waitpid($childPid, $status, WNOHANG);
if ($result !== 0) {
echo date('Y-m-d H:i:s') . " [LEADER] Child process exited\n";
break;
}
}
// Leadership is lost or child died: Kill child process
echo date('Y-m-d H:i:s') . " [LEADER] ✗ Lost leadership. Killing child process...\n";
posix_kill($childPid, SIGTERM);
pcntl_waitpid($childPid, $status);
$childPid = null;
} else {
echo date('Y-m-d H:i:s') . " [FOLLOWER] Waiting...\n";
sleep(5);
}
}
}
/**
* Executes a process periodically when gets the leadership.
*/
public function runAsLeader(callable $callback, int $renewInterval = 10): void
{
echo "[ELECTION] Starting with identity: {$this->identity}\n";
while (true) {
if ($this->tryAcquire()) {
echo date('Y-m-d H:i:s') . " [LEADER] ✓ I'm the leader\n";
while ($this->renew()) {
$callback();
sleep($renewInterval);
}
echo date('Y-m-d H:i:s') . " [LEADER] ✗ Lost leadership\n";
} else {
echo date('Y-m-d H:i:s') . " [FOLLOWER] Waiting...\n";
sleep(5);
}
}
}
}
// Example 1: Process that runs forever
try {
$election = new KubernetesLeaderElection('default', 'my-app-lease');
$election->runAsLeaderWithProcess(function() {
echo "[CHILD] Process started. Running infinite loop...\n";
$counter = 0;
while (true) {
echo "[CHILD] Working... iteration " . ++$counter . "\n";
sleep(2);
}
});
} catch (Exception $e) {
echo "ERROR: " . $e->getMessage() . "\n";
exit(1);
}
// Example 2: Periodic Callback
/*
try {
$election = new KubernetesLeaderElection('default', 'my-app-lease');
$election->runAsLeader(function() {
echo " → Executing leader task...\n";
});
} catch (Exception $e) {
echo "ERROR: " . $e->getMessage() . "\n";
exit(1);
}
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment