Skip to content

Instantly share code, notes, and snippets.

@kozikow
Last active September 6, 2016 15:30
Show Gist options
  • Save kozikow/1020c0474f62ebadd618be5244fda9d9 to your computer and use it in GitHub Desktop.
Save kozikow/1020c0474f62ebadd618be5244fda9d9 to your computer and use it in GitHub Desktop.
Schedule kubernetes job
import os
from datetime import datetime, timedelta
from typing import List
import requests
from k8s.io.kubernetes.pkg.api.v1 import generated_pb2 as v1
from k8s.io.kubernetes.pkg.apis.batch.v1 import generated_pb2 as batch_v1
from k8s.io.kubernetes.pkg.runtime import generated_pb2 as runtime
VND_KUBERNETES_PROTOBUF = "application/vnd.kubernetes.protobuf"
def _serialize_proto(proto, kind: str, api_version: str = "batch/v1") -> bytes:
return bytes.fromhex("6b387300") + runtime.Unknown(
typeMeta=runtime.TypeMeta(
apiVersion=api_version,
kind=kind
),
raw=proto.SerializeToString(),
contentType=VND_KUBERNETES_PROTOBUF
).SerializeToString()
def schedule_job(name: str, image: str, docker_args: List[str] = None,
namespace="default"):
if docker_args is None:
docker_args = []
job = batch_v1.Job(
metadata=v1.ObjectMeta(
name=name,
namespace=namespace),
spec=batch_v1.JobSpec(
template=v1.PodTemplateSpec(
spec=v1.PodSpec(
containers=[
v1.Container(
image=image,
name=name,
args=docker_args
)
],
restartPolicy="Never"
)
)
))
url = "{}/apis/batch/v1/namespaces/{}/jobs".format(
os.environ["KUBERNETES_API_URL"], namespace)
reply = requests.post(url, data=_serialize_proto(job, kind="Job"),
headers={
"content-type": VND_KUBERNETES_PROTOBUF})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment