Created
January 31, 2017 08:17
-
-
Save kozikow/86ee26998c5de1cf3780b6bc0cd35ec0 to your computer and use it in GitHub Desktop.
Submiting job to kubernetes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import kubernetes | |
import logging | |
import math | |
from kubernetes.client import V1Container | |
from kubernetes.client import V1EnvVar | |
from kubernetes.client import V1Job | |
from kubernetes.client import V1JobSpec | |
from kubernetes.client import V1ObjectMeta | |
from kubernetes.client import V1PodSpec | |
from kubernetes.client import V1PodTemplateSpec | |
from kubernetes.client import V1ResourceRequirements | |
_KUBERNETES_NAMESPACE = "echoserver" | |
_DEADLINE_SECONDS = 60 * 60 * 12 # 12h | |
_BYTES_PER_TILE = 500000 # 500 KB | |
_BASE_MEM_BYTES = 3000 * 1024 * 1024 # 3000 MiB | |
def _plan_to_job_name(plan_id): | |
clean_plan = plan_id.replace("_", "-").lower() | |
clean_plan = re.sub("[^-a-z0-9]+", "-", clean_plan) | |
return "worker-" + clean_plan | |
def _tiles_to_memory_limit(tiles): | |
bytes_req = _BASE_MEM_BYTES + _BYTES_PER_TILE * len(tiles) | |
memory_req = str(int(math.ceil(bytes_req / (1024 * 1024)))) + "Mi" | |
logging.info( | |
"Memory requirement", | |
extra={ | |
"tiles_len": len(tiles), | |
"memory_req_bytes": bytes_req, | |
"memory_req": memory_req | |
}) | |
return memory_req | |
class KubernetesTfApi(object): | |
def __init__(self): | |
logging.info("Initializing kubernetes api") | |
kubernetes.config.load_incluster_config() | |
self._batch_api = kubernetes.client.apis.batch_v1_api.BatchV1Api() | |
self._core_api = kubernetes.client.CoreV1Api() | |
def clean_old_job(self, plan_id): | |
label_selector = "plan=={}".format(plan_id) | |
self._batch_api.delete_collection_namespaced_job( | |
namespace=_KUBERNETES_NAMESPACE, | |
label_selector=label_selector, | |
) | |
self._core_api.delete_collection_namespaced_pod( | |
namespace=_KUBERNETES_NAMESPACE, | |
label_selector=label_selector | |
) | |
def request_plan_processing(self, plan_id, tiles): | |
job_name = _plan_to_job_name(plan_id) | |
self.clean_old_job(plan_id) | |
object_meta = V1ObjectMeta(name=job_name, labels={"plan": plan_id}) | |
memory_usage = {"memory": _tiles_to_memory_limit(tiles)} | |
pod_spec = V1PodSpec( | |
containers=[ | |
V1Container( | |
args=["python2", "/workspace/worker/main.py", plan_id], | |
env=[V1EnvVar(name="SERVER_STAGE", | |
value=os.environ["SERVER_STAGE"]), | |
V1EnvVar(name="OPENBLAS_NUM_THREADS", | |
value="2")], | |
image_pull_policy="IfNotPresent", | |
image="gcr.io/tensorflight/worker:27.0.0", | |
name=job_name, | |
resources=V1ResourceRequirements( | |
limits=memory_usage, | |
requests=memory_usage | |
) | |
) | |
], | |
restart_policy="OnFailure", | |
active_deadline_seconds=_DEADLINE_SECONDS | |
) | |
job_spec = V1JobSpec( | |
active_deadline_seconds=_DEADLINE_SECONDS, | |
completions=1, | |
parallelism=1, | |
template=V1PodTemplateSpec( | |
spec=pod_spec, | |
metadata=object_meta | |
), | |
) | |
self._batch_api.create_namespaced_job(namespace=_KUBERNETES_NAMESPACE, | |
body=V1Job(spec=job_spec, | |
metadata=object_meta)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi. I have a python script bh1.py. i am going to run it on cluster. but I don't know how to run it with your code. my python script has been copied into /home/sassan/ directory in the cluster.