Skip to content

Instantly share code, notes, and snippets.

@StevenACoffman
Forked from kozikow/job_submit.py
Created July 25, 2018 01:22
Show Gist options
  • Save StevenACoffman/a0b6232b9dd892b6a1b1853a57aed349 to your computer and use it in GitHub Desktop.
Save StevenACoffman/a0b6232b9dd892b6a1b1853a57aed349 to your computer and use it in GitHub Desktop.
Submiting job to kubernetes
import os
import re
import kubernetes
import logging
import math
from kubernetes.client import V1Container
from kubernetes.client import V1EnvVar
from kubernetes.client import V1Job
from kubernetes.client import V1JobSpec
from kubernetes.client import V1ObjectMeta
from kubernetes.client import V1PodSpec
from kubernetes.client import V1PodTemplateSpec
from kubernetes.client import V1ResourceRequirements
_KUBERNETES_NAMESPACE = "echoserver"
_DEADLINE_SECONDS = 60 * 60 * 12 # 12h
_BYTES_PER_TILE = 500000 # 500 KB
_BASE_MEM_BYTES = 3000 * 1024 * 1024 # 3000 MiB
def _plan_to_job_name(plan_id):
clean_plan = plan_id.replace("_", "-").lower()
clean_plan = re.sub("[^-a-z0-9]+", "-", clean_plan)
return "worker-" + clean_plan
def _tiles_to_memory_limit(tiles):
bytes_req = _BASE_MEM_BYTES + _BYTES_PER_TILE * len(tiles)
memory_req = str(int(math.ceil(bytes_req / (1024 * 1024)))) + "Mi"
logging.info(
"Memory requirement",
extra={
"tiles_len": len(tiles),
"memory_req_bytes": bytes_req,
"memory_req": memory_req
})
return memory_req
class KubernetesTfApi(object):
def __init__(self):
logging.info("Initializing kubernetes api")
kubernetes.config.load_incluster_config()
self._batch_api = kubernetes.client.apis.batch_v1_api.BatchV1Api()
self._core_api = kubernetes.client.CoreV1Api()
def clean_old_job(self, plan_id):
label_selector = "plan=={}".format(plan_id)
self._batch_api.delete_collection_namespaced_job(
namespace=_KUBERNETES_NAMESPACE,
label_selector=label_selector,
)
self._core_api.delete_collection_namespaced_pod(
namespace=_KUBERNETES_NAMESPACE,
label_selector=label_selector
)
def request_plan_processing(self, plan_id, tiles):
job_name = _plan_to_job_name(plan_id)
self.clean_old_job(plan_id)
object_meta = V1ObjectMeta(name=job_name, labels={"plan": plan_id})
memory_usage = {"memory": _tiles_to_memory_limit(tiles)}
pod_spec = V1PodSpec(
containers=[
V1Container(
args=["python2", "/workspace/worker/main.py", plan_id],
env=[V1EnvVar(name="SERVER_STAGE",
value=os.environ["SERVER_STAGE"]),
V1EnvVar(name="OPENBLAS_NUM_THREADS",
value="2")],
image_pull_policy="IfNotPresent",
image="gcr.io/tensorflight/worker:27.0.0",
name=job_name,
resources=V1ResourceRequirements(
limits=memory_usage,
requests=memory_usage
)
)
],
restart_policy="OnFailure",
active_deadline_seconds=_DEADLINE_SECONDS
)
job_spec = V1JobSpec(
active_deadline_seconds=_DEADLINE_SECONDS,
completions=1,
parallelism=1,
template=V1PodTemplateSpec(
spec=pod_spec,
metadata=object_meta
),
)
self._batch_api.create_namespaced_job(namespace=_KUBERNETES_NAMESPACE,
body=V1Job(spec=job_spec,
metadata=object_meta))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment