Created
November 28, 2022 20:09
-
-
Save andyscott/c119f9503218693bd4362eff3da26ab4 to your computer and use it in GitHub Desktop.
run a command on each pod given a selection criteria
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env nix-shell | |
#!nix-shell -i python3 | |
import argparse | |
import sys | |
import typing as t | |
import subprocess | |
def get_namespace(args: t.Sequence[str]) -> t.Optional[str]: | |
""" | |
Inspects a set of arguments and identifies the kubernetes namespace. | |
Expects the same parameters as kubectrl command invocations. | |
""" | |
parser = argparse.ArgumentParser(add_help=False, exit_on_error=False) | |
parser.add_argument("--namespace", type=str, nargs="?") | |
parsed, _ = parser.parse_known_args(sys.argv) | |
return parsed.namespace | |
def split_args(args: t.Iterable): | |
""" | |
Splits a single set of arguments into two sets of arguments at the | |
double dash "--" delimiter. | |
""" | |
it = iter(args) | |
def first_group(): | |
for arg in it: | |
if arg != "--": | |
yield arg | |
else: | |
return | |
def second_group(): | |
yield from it | |
return list(first_group()), list(second_group()) | |
def get_pods(kubectl_args: t.Sequence[str]) -> t.Sequence[str]: | |
args = ( | |
[ | |
"kubectl", | |
"get", | |
"pods", | |
] | |
+ kubectl_args | |
+ [ | |
"-o", | |
"custom-columns=name:metadata.name", | |
"--no-headers", | |
] | |
) | |
print("running:", " ".join(args)) | |
res = subprocess.run( | |
args, | |
stdout=subprocess.PIPE, | |
check=True, | |
encoding="utf-8", | |
) | |
return res.stdout.splitlines() | |
first_group, second_group = split_args(sys.argv[1:]) | |
namespace = get_namespace(first_group) | |
base_args = ["kubectl", "exec"] | |
if namespace: | |
base_args += ["--namespace", namespace] | |
for pod in get_pods(first_group): | |
print(">", pod) | |
args = ( | |
base_args | |
+ [ | |
pod, | |
"--", | |
] | |
+ second_group | |
) | |
print("running:", " ".join(args)) | |
res = subprocess.run( | |
args, | |
check=True, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment