Created
October 31, 2025 14:46
-
-
Save achetronic/98f454d40c7ac4fbabb5f597d693ca0b to your computer and use it in GitHub Desktop.
Super simple leader election in Kubernetes with PHP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| class KubernetesLeaderElection | |
| { | |
| private string $apiServer; | |
| private ?string $token = null; | |
| private string $namespace; | |
| private string $leaseName; | |
| private string $identity; | |
| private int $leaseDuration; | |
| private ?string $caCert = null; | |
| private ?string $clientCert = null; | |
| private ?string $clientKey = null; | |
| private bool $insecure; | |
| public function __construct( | |
| string $namespace, | |
| string $leaseName, | |
| int $leaseDuration = 15, | |
| ?string $identity = null, | |
| ?string $kubeconfig = null | |
| ) { | |
| $this->namespace = $namespace; | |
| $this->leaseName = $leaseName; | |
| $this->identity = $identity ?? (gethostname() ?: uniqid('php-', true)); | |
| $this->leaseDuration = $leaseDuration; | |
| if ($this->isInCluster() && !$kubeconfig) { | |
| $this->configureInCluster(); | |
| } else { | |
| $this->configureFromKubeconfig($kubeconfig ?? $this->getDefaultKubeconfigPath()); | |
| } | |
| } | |
| private function isInCluster(): bool | |
| { | |
| return file_exists('/var/run/secrets/kubernetes.io/serviceaccount/token'); | |
| } | |
| private function configureInCluster(): void | |
| { | |
| $this->apiServer = 'https://' . getenv('KUBERNETES_SERVICE_HOST') . ':' . getenv('KUBERNETES_SERVICE_PORT'); | |
| $this->token = file_get_contents('/var/run/secrets/kubernetes.io/serviceaccount/token'); | |
| $this->caCert = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'; | |
| $this->insecure = false; | |
| echo "[CONFIG] In-cluster: {$this->apiServer}\n"; | |
| } | |
| private function getDefaultKubeconfigPath(): string | |
| { | |
| $home = getenv('HOME') ?: getenv('USERPROFILE'); | |
| return $home . '/.kube/config'; | |
| } | |
| private function configureFromKubeconfig(string $path): void | |
| { | |
| if (!file_exists($path)) { | |
| throw new RuntimeException("Kubeconfig not found in: {$path}"); | |
| } | |
| $config = yaml_parse_file($path); | |
| if (!$config) { | |
| throw new RuntimeException("Failed parsing kubeconfig"); | |
| } | |
| $currentContext = $config['current-context'] ?? null; | |
| if (!$currentContext) { | |
| throw new RuntimeException("Context not found in kubeconfig"); | |
| } | |
| $context = null; | |
| foreach ($config['contexts'] as $ctx) { | |
| if ($ctx['name'] === $currentContext) { | |
| $context = $ctx['context']; | |
| break; | |
| } | |
| } | |
| if (!$context) { | |
| throw new RuntimeException("Context '{$currentContext}' not found"); | |
| } | |
| $clusterName = $context['cluster']; | |
| $cluster = null; | |
| foreach ($config['clusters'] as $cls) { | |
| if ($cls['name'] === $clusterName) { | |
| $cluster = $cls['cluster']; | |
| break; | |
| } | |
| } | |
| if (!$cluster) { | |
| throw new RuntimeException("Cluster '{$clusterName}' not found"); | |
| } | |
| $this->apiServer = $cluster['server']; | |
| if (isset($cluster['certificate-authority-data'])) { | |
| $caCertPath = sys_get_temp_dir() . '/k8s-ca-' . md5($cluster['certificate-authority-data']) . '.crt'; | |
| file_put_contents($caCertPath, base64_decode($cluster['certificate-authority-data'])); | |
| $this->caCert = $caCertPath; | |
| $this->insecure = false; | |
| } elseif (isset($cluster['certificate-authority'])) { | |
| $this->caCert = $cluster['certificate-authority']; | |
| $this->insecure = false; | |
| } else { | |
| $this->caCert = null; | |
| $this->insecure = $cluster['insecure-skip-tls-verify'] ?? false; | |
| } | |
| $userName = $context['user']; | |
| $user = null; | |
| foreach ($config['users'] as $usr) { | |
| if ($usr['name'] === $userName) { | |
| $user = $usr['user']; | |
| break; | |
| } | |
| } | |
| if (!$user) { | |
| throw new RuntimeException("User '{$userName}' not found"); | |
| } | |
| if (isset($user['token'])) { | |
| $this->token = $user['token']; | |
| $authMethod = 'token'; | |
| } elseif (isset($user['client-certificate-data']) && isset($user['client-key-data'])) { | |
| $this->clientCert = $this->writeTempFile( | |
| base64_decode($user['client-certificate-data']), | |
| 'k8s-client-cert-' . md5($user['client-certificate-data']) . '.crt' | |
| ); | |
| $this->clientKey = $this->writeTempFile( | |
| base64_decode($user['client-key-data']), | |
| 'k8s-client-key-' . md5($user['client-key-data']) . '.key' | |
| ); | |
| $authMethod = 'client-cert'; | |
| } elseif (isset($user['client-certificate']) && isset($user['client-key'])) { | |
| $this->clientCert = $user['client-certificate']; | |
| $this->clientKey = $user['client-key']; | |
| $authMethod = 'client-cert'; | |
| } elseif (isset($user['exec'])) { | |
| $this->token = $this->executeCredentialPlugin($user['exec']); | |
| $authMethod = 'exec-plugin'; | |
| } else { | |
| throw new RuntimeException("Valid authn method not found"); | |
| } | |
| echo "[CONFIG] Kubeconfig: {$this->apiServer} (auth: {$authMethod}, insecure: " . | |
| ($this->insecure ? 'yes' : 'no') . ")\n"; | |
| } | |
| private function writeTempFile(string $content, string $filename): string | |
| { | |
| $path = sys_get_temp_dir() . '/' . $filename; | |
| file_put_contents($path, $content); | |
| chmod($path, 0600); | |
| return $path; | |
| } | |
| private function executeCredentialPlugin(array $exec): string | |
| { | |
| $command = $exec['command']; | |
| $args = $exec['args'] ?? []; | |
| $env = $exec['env'] ?? []; | |
| $envVars = ''; | |
| foreach ($env as $envVar) { | |
| $envVars .= "{$envVar['name']}={$envVar['value']} "; | |
| } | |
| $fullCommand = $envVars . escapeshellcmd($command) . ' ' . implode(' ', array_map('escapeshellarg', $args)); | |
| $output = shell_exec($fullCommand); | |
| if (!$output) { | |
| throw new RuntimeException("Failed executing credentials plugin"); | |
| } | |
| $credentials = json_decode($output, true); | |
| if (!isset($credentials['status']['token'])) { | |
| throw new RuntimeException("Credentials plugin didn't retrieve a token"); | |
| } | |
| return $credentials['status']['token']; | |
| } | |
| private function request(string $method, string $path, ?array $body = null): array | |
| { | |
| $ch = curl_init("{$this->apiServer}{$path}"); | |
| $contentType = ($method === 'PATCH') | |
| ? 'application/strategic-merge-patch+json' | |
| : 'application/json'; | |
| $headers = ["Content-Type: {$contentType}"]; | |
| if ($this->token) { | |
| $headers[] = 'Authorization: Bearer ' . $this->token; | |
| } | |
| $opts = [ | |
| CURLOPT_CUSTOMREQUEST => $method, | |
| CURLOPT_RETURNTRANSFER => true, | |
| CURLOPT_HTTPHEADER => $headers, | |
| ]; | |
| if ($this->insecure) { | |
| $opts[CURLOPT_SSL_VERIFYPEER] = false; | |
| $opts[CURLOPT_SSL_VERIFYHOST] = false; | |
| } elseif ($this->caCert) { | |
| $opts[CURLOPT_CAINFO] = $this->caCert; | |
| } | |
| if ($this->clientCert && $this->clientKey) { | |
| $opts[CURLOPT_SSLCERT] = $this->clientCert; | |
| $opts[CURLOPT_SSLKEY] = $this->clientKey; | |
| } | |
| if ($body) { | |
| $opts[CURLOPT_POSTFIELDS] = json_encode($body); | |
| } | |
| curl_setopt_array($ch, $opts); | |
| $response = curl_exec($ch); | |
| $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); | |
| if (curl_errno($ch)) { | |
| throw new RuntimeException("Curl error: " . curl_error($ch)); | |
| } | |
| curl_close($ch); | |
| return [ | |
| 'code' => $httpCode, | |
| 'body' => $response ? json_decode($response, true) : null | |
| ]; | |
| } | |
| public function tryAcquire(): bool | |
| { | |
| $now = (new DateTime('now', new DateTimeZone('UTC')))->format('Y-m-d\TH:i:s.u\Z'); | |
| $path = "/apis/coordination.k8s.io/v1/namespaces/{$this->namespace}/leases/{$this->leaseName}"; | |
| $result = $this->request('GET', $path); | |
| if ($result['code'] === 404) { | |
| return $this->createLease($now); | |
| } | |
| if ($result['code'] !== 200) { | |
| return false; | |
| } | |
| $lease = $result['body']; | |
| $holder = $lease['spec']['holderIdentity'] ?? null; | |
| $renewTime = $lease['spec']['renewTime'] ?? null; | |
| if ($holder === $this->identity || $this->isExpired($renewTime)) { | |
| return $this->updateLease($now, $lease['metadata']['resourceVersion']); | |
| } | |
| return false; | |
| } | |
| private function createLease(string $now): bool | |
| { | |
| $body = [ | |
| 'apiVersion' => 'coordination.k8s.io/v1', | |
| 'kind' => 'Lease', | |
| 'metadata' => ['name' => $this->leaseName], | |
| 'spec' => [ | |
| 'holderIdentity' => $this->identity, | |
| 'leaseDurationSeconds' => $this->leaseDuration, | |
| 'acquireTime' => $now, | |
| 'renewTime' => $now, | |
| ] | |
| ]; | |
| $result = $this->request( | |
| 'POST', | |
| "/apis/coordination.k8s.io/v1/namespaces/{$this->namespace}/leases", | |
| $body | |
| ); | |
| return $result['code'] === 201; | |
| } | |
| private function updateLease(string $now, string $resourceVersion): bool | |
| { | |
| $body = [ | |
| 'spec' => [ | |
| 'holderIdentity' => $this->identity, | |
| 'renewTime' => $now, | |
| ], | |
| 'metadata' => [ | |
| 'resourceVersion' => $resourceVersion | |
| ] | |
| ]; | |
| $result = $this->request( | |
| 'PATCH', | |
| "/apis/coordination.k8s.io/v1/namespaces/{$this->namespace}/leases/{$this->leaseName}", | |
| $body | |
| ); | |
| return in_array($result['code'], [200, 201]); | |
| } | |
| public function renew(): bool | |
| { | |
| $result = $this->request( | |
| 'GET', | |
| "/apis/coordination.k8s.io/v1/namespaces/{$this->namespace}/leases/{$this->leaseName}" | |
| ); | |
| if ($result['code'] !== 200) { | |
| return false; | |
| } | |
| $lease = $result['body']; | |
| if (($lease['spec']['holderIdentity'] ?? null) !== $this->identity) { | |
| return false; | |
| } | |
| $now = (new DateTime('now', new DateTimeZone('UTC')))->format('Y-m-d\TH:i:s.u\Z'); | |
| return $this->updateLease($now, $lease['metadata']['resourceVersion']); | |
| } | |
| private function isExpired(?string $renewTime): bool | |
| { | |
| if (!$renewTime) return true; | |
| $renewTimestamp = strtotime($renewTime); | |
| return (time() - $renewTimestamp) > $this->leaseDuration; | |
| } | |
| /** | |
| * Executes a child process when gets the leadership. | |
| * Child process is killes automatically when leadership is lost. | |
| */ | |
| public function runAsLeaderWithProcess(callable $callback, int $renewInterval = 10): void | |
| { | |
| echo "[ELECTION] Starting with identity: {$this->identity}\n"; | |
| $childPid = null; | |
| // Handler for cleaning child process on exit | |
| pcntl_async_signals(true); | |
| pcntl_signal(SIGTERM, function() use (&$childPid) { | |
| if ($childPid) { | |
| posix_kill($childPid, SIGTERM); | |
| pcntl_waitpid($childPid, $status); | |
| } | |
| exit(0); | |
| }); | |
| while (true) { | |
| if ($this->tryAcquire()) { | |
| echo date('Y-m-d H:i:s') . " [LEADER] ✓ I'm the leader. Starting process...\n"; | |
| // Fork: create child process | |
| $childPid = pcntl_fork(); | |
| if ($childPid === -1) { | |
| throw new RuntimeException("Failed to fork process"); | |
| } | |
| if ($childPid === 0) { | |
| // Child process: execute the callback | |
| try { | |
| $callback(); | |
| } catch (Exception $e) { | |
| echo "[CHILD] Error: " . $e->getMessage() . "\n"; | |
| exit(1); | |
| } | |
| exit(0); | |
| } | |
| // Parent process: Renew the lease | |
| while ($this->renew()) { | |
| sleep($renewInterval); | |
| // Verify if child is alive | |
| $result = pcntl_waitpid($childPid, $status, WNOHANG); | |
| if ($result !== 0) { | |
| echo date('Y-m-d H:i:s') . " [LEADER] Child process exited\n"; | |
| break; | |
| } | |
| } | |
| // Leadership is lost or child died: Kill child process | |
| echo date('Y-m-d H:i:s') . " [LEADER] ✗ Lost leadership. Killing child process...\n"; | |
| posix_kill($childPid, SIGTERM); | |
| pcntl_waitpid($childPid, $status); | |
| $childPid = null; | |
| } else { | |
| echo date('Y-m-d H:i:s') . " [FOLLOWER] Waiting...\n"; | |
| sleep(5); | |
| } | |
| } | |
| } | |
| /** | |
| * Executes a process periodically when gets the leadership. | |
| */ | |
| public function runAsLeader(callable $callback, int $renewInterval = 10): void | |
| { | |
| echo "[ELECTION] Starting with identity: {$this->identity}\n"; | |
| while (true) { | |
| if ($this->tryAcquire()) { | |
| echo date('Y-m-d H:i:s') . " [LEADER] ✓ I'm the leader\n"; | |
| while ($this->renew()) { | |
| $callback(); | |
| sleep($renewInterval); | |
| } | |
| echo date('Y-m-d H:i:s') . " [LEADER] ✗ Lost leadership\n"; | |
| } else { | |
| echo date('Y-m-d H:i:s') . " [FOLLOWER] Waiting...\n"; | |
| sleep(5); | |
| } | |
| } | |
| } | |
| } | |
| // Example 1: Process that runs forever | |
| try { | |
| $election = new KubernetesLeaderElection('default', 'my-app-lease'); | |
| $election->runAsLeaderWithProcess(function() { | |
| echo "[CHILD] Process started. Running infinite loop...\n"; | |
| $counter = 0; | |
| while (true) { | |
| echo "[CHILD] Working... iteration " . ++$counter . "\n"; | |
| sleep(2); | |
| } | |
| }); | |
| } catch (Exception $e) { | |
| echo "ERROR: " . $e->getMessage() . "\n"; | |
| exit(1); | |
| } | |
| // Example 2: Periodic Callback | |
| /* | |
| try { | |
| $election = new KubernetesLeaderElection('default', 'my-app-lease'); | |
| $election->runAsLeader(function() { | |
| echo " → Executing leader task...\n"; | |
| }); | |
| } catch (Exception $e) { | |
| echo "ERROR: " . $e->getMessage() . "\n"; | |
| exit(1); | |
| } | |
| */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment