-
-
Save StevenACoffman/a0b6232b9dd892b6a1b1853a57aed349 to your computer and use it in GitHub Desktop.
Submiting job to kubernetes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import kubernetes | |
import logging | |
import math | |
from kubernetes.client import V1Container | |
from kubernetes.client import V1EnvVar | |
from kubernetes.client import V1Job | |
from kubernetes.client import V1JobSpec | |
from kubernetes.client import V1ObjectMeta | |
from kubernetes.client import V1PodSpec | |
from kubernetes.client import V1PodTemplateSpec | |
from kubernetes.client import V1ResourceRequirements | |
_KUBERNETES_NAMESPACE = "echoserver" | |
_DEADLINE_SECONDS = 60 * 60 * 12 # 12h | |
_BYTES_PER_TILE = 500000 # 500 KB | |
_BASE_MEM_BYTES = 3000 * 1024 * 1024 # 3000 MiB | |
def _plan_to_job_name(plan_id): | |
clean_plan = plan_id.replace("_", "-").lower() | |
clean_plan = re.sub("[^-a-z0-9]+", "-", clean_plan) | |
return "worker-" + clean_plan | |
def _tiles_to_memory_limit(tiles): | |
bytes_req = _BASE_MEM_BYTES + _BYTES_PER_TILE * len(tiles) | |
memory_req = str(int(math.ceil(bytes_req / (1024 * 1024)))) + "Mi" | |
logging.info( | |
"Memory requirement", | |
extra={ | |
"tiles_len": len(tiles), | |
"memory_req_bytes": bytes_req, | |
"memory_req": memory_req | |
}) | |
return memory_req | |
class KubernetesTfApi(object): | |
def __init__(self): | |
logging.info("Initializing kubernetes api") | |
kubernetes.config.load_incluster_config() | |
self._batch_api = kubernetes.client.apis.batch_v1_api.BatchV1Api() | |
self._core_api = kubernetes.client.CoreV1Api() | |
def clean_old_job(self, plan_id): | |
label_selector = "plan=={}".format(plan_id) | |
self._batch_api.delete_collection_namespaced_job( | |
namespace=_KUBERNETES_NAMESPACE, | |
label_selector=label_selector, | |
) | |
self._core_api.delete_collection_namespaced_pod( | |
namespace=_KUBERNETES_NAMESPACE, | |
label_selector=label_selector | |
) | |
def request_plan_processing(self, plan_id, tiles): | |
job_name = _plan_to_job_name(plan_id) | |
self.clean_old_job(plan_id) | |
object_meta = V1ObjectMeta(name=job_name, labels={"plan": plan_id}) | |
memory_usage = {"memory": _tiles_to_memory_limit(tiles)} | |
pod_spec = V1PodSpec( | |
containers=[ | |
V1Container( | |
args=["python2", "/workspace/worker/main.py", plan_id], | |
env=[V1EnvVar(name="SERVER_STAGE", | |
value=os.environ["SERVER_STAGE"]), | |
V1EnvVar(name="OPENBLAS_NUM_THREADS", | |
value="2")], | |
image_pull_policy="IfNotPresent", | |
image="gcr.io/tensorflight/worker:27.0.0", | |
name=job_name, | |
resources=V1ResourceRequirements( | |
limits=memory_usage, | |
requests=memory_usage | |
) | |
) | |
], | |
restart_policy="OnFailure", | |
active_deadline_seconds=_DEADLINE_SECONDS | |
) | |
job_spec = V1JobSpec( | |
active_deadline_seconds=_DEADLINE_SECONDS, | |
completions=1, | |
parallelism=1, | |
template=V1PodTemplateSpec( | |
spec=pod_spec, | |
metadata=object_meta | |
), | |
) | |
self._batch_api.create_namespaced_job(namespace=_KUBERNETES_NAMESPACE, | |
body=V1Job(spec=job_spec, | |
metadata=object_meta)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment