Last active
March 8, 2018 01:53
-
-
Save kozikow/37ea56e9f972c6927827f4635ee162d5 to your computer and use it in GitHub Desktop.
Airflow config
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Generate airflow config to stdout. | |
// To start service the from scratch, put this in src/airflow_config/airflow_config.go and run: | |
// export GOPATH=$(pwd) | |
// export PATH=$PATH:$GOPATH/bin | |
// go get k8s.io/client-go/1.4/kubernetes | |
// go install airflow_config && airflow_config > airflow.yaml | |
// kubectl create -f airflow.yaml | |
package main | |
import ( | |
"fmt" | |
"log" | |
"os" | |
"path/filepath" | |
"strconv" | |
"k8s.io/client-go/1.4/pkg/api/unversioned" | |
"k8s.io/client-go/1.4/pkg/api/v1" | |
"k8s.io/client-go/1.4/pkg/apis/extensions/v1beta1" | |
"k8s.io/client-go/1.4/pkg/runtime" | |
"k8s.io/client-go/1.4/pkg/runtime/serializer/json" | |
"k8s.io/client-go/1.4/pkg/util/intstr" | |
) | |
type ImageName string | |
type AirflowTier string | |
type DeploymentType string | |
type PortBindRequest struct { | |
FromPort string // uniquely identifies container/port in the cluster by name. | |
ToPort int32 // Will get assigned ip. | |
} | |
type PathBindingRequest struct { | |
HostPath string | |
ContainerPath string | |
Name string | |
} | |
const ( | |
FlowerTier AirflowTier = "flower" | |
WebServerTier AirflowTier = "webserver" | |
SchedulerTier AirflowTier = "scheduler" | |
WorkerTier AirflowTier = "worker" | |
RabbitMqTier AirflowTier = "rabbitmq" | |
PostgresTier AirflowTier = "postgres" | |
AirflowImage ImageName = "gcr.io/all-the-codes/docker_airflow:0.3" | |
PostgresImage ImageName = "postgres" | |
RabbitMqImage ImageName = "rabbitmq:3-management" | |
KubectlImage ImageName = "gcr.io/all-the-codes/docker_base_arch:0.2" | |
ProdDeployment DeploymentType = "prod" | |
DevDeployment DeploymentType = "dev" | |
AirflowName string = "airflow" | |
AirflowHome string = "/home/user/airflow" | |
CodeDirectory string = "/home/user/code" | |
DockerUser string = "user" | |
DagMountName string = "dag-directory-binding" | |
ConfigMountName string = "config-directory-binding" | |
HttpPort = 80 | |
PostgresPort = 5432 | |
FlowerContainerPort = 5555 | |
AirflowLogProxyPort = 8793 | |
RabbitMqNodePort = 5672 | |
WebContainerPort = 8000 | |
KubernetesApiPort = 8001 | |
RabbitMqManagementPort = 15672 | |
PullPolicy = v1.PullIfNotPresent | |
) | |
func AirflowServiceMeta(tier AirflowTier) v1.ObjectMeta { | |
return v1.ObjectMeta{ | |
Name: string(tier), | |
Labels: map[string]string{ | |
"app": string(tier)}} | |
} | |
func AirflowDeploymentSelector(tier AirflowTier) map[string]string { | |
return map[string]string{ | |
"app": AirflowName, | |
"tier": string(tier)} | |
} | |
func InternalPortName(tier AirflowTier, port int32) string { | |
return fmt.Sprintf("%s-%d", string(tier), port) | |
} | |
func AirflowTierArgs(tier AirflowTier) []string { | |
if tier == RabbitMqTier || tier == PostgresTier { | |
return []string{} | |
} else { | |
return []string{ | |
filepath.Join(CodeDirectory, "entry_point.py"), | |
string(tier)} | |
} | |
} | |
func AirflowPortBindService(serviceType v1.ServiceType, tier AirflowTier, bindRequests ...PortBindRequest) *v1.Service { | |
var servicePorts = []v1.ServicePort{} | |
for _, bindRequest := range bindRequests { | |
servicePorts = append(servicePorts, v1.ServicePort{ | |
Protocol: "TCP", | |
TargetPort: intstr.FromString(bindRequest.FromPort), | |
Name: bindRequest.FromPort, | |
Port: bindRequest.ToPort, | |
}) | |
} | |
return &v1.Service{ | |
Spec: v1.ServiceSpec{ | |
Type: serviceType, | |
Ports: servicePorts, | |
Selector: AirflowDeploymentSelector(tier)}, | |
ObjectMeta: AirflowServiceMeta(tier), | |
TypeMeta: unversioned.TypeMeta{ | |
Kind: "Service", | |
APIVersion: "v1"}, | |
} | |
} | |
func AirflowPodEnv(tier AirflowTier) []v1.EnvVar { | |
if tier == PostgresTier { | |
return []v1.EnvVar{ | |
v1.EnvVar{ | |
Name: "POSTGRES_USER", | |
Value: "airflow"}, | |
v1.EnvVar{ | |
Name: "POSTGRES_PASSWORD", | |
Value: "airflow"}, | |
v1.EnvVar{ | |
Name: "POSTGRES_DB", | |
Value: "airflow"}} | |
} else if tier == RabbitMqTier { | |
return []v1.EnvVar{ | |
v1.EnvVar{ | |
Name: "RABBITMQ_DEFAULT_USER", | |
Value: "airflow"}, | |
v1.EnvVar{ | |
Name: "RABBITMQ_DEFAULT_PASS", | |
Value: "airflow"}, | |
v1.EnvVar{ | |
Name: "RABBITMQ_DEFAULT_VHOST", | |
Value: "airflow"}} | |
} else { | |
baseEnv := []v1.EnvVar{ | |
v1.EnvVar{ | |
Name: "AIRFLOW_HOME", | |
Value: AirflowHome}, | |
// Rabbitmq and postgres are not exposed to the outside world, so password does not mattern as much ATM. | |
} | |
if tier == FlowerTier { | |
baseEnv = append(baseEnv, v1.EnvVar{ | |
Name: "FLOWER_PORT", | |
Value: strconv.Itoa(FlowerContainerPort)}) | |
} | |
if tier == WorkerTier { | |
baseEnv = append(baseEnv, v1.EnvVar{ | |
Name: "KUBERNETES_API_URL", | |
Value: fmt.Sprintf("http://127.0.0.1:%d", KubernetesApiPort)}) | |
} | |
return baseEnv | |
} | |
} | |
func GetVolumeMounts(depType DeploymentType) []v1.VolumeMount { | |
switch depType { | |
case ProdDeployment: | |
return []v1.VolumeMount{} | |
case DevDeployment: | |
return []v1.VolumeMount{ | |
v1.VolumeMount{ | |
Name: DagMountName, | |
ReadOnly: true, | |
MountPath: filepath.Join(CodeDirectory, "dags"), | |
}, | |
v1.VolumeMount{ | |
Name: ConfigMountName, | |
ReadOnly: true, | |
MountPath: filepath.Join("/home", DockerUser, ".config"), | |
}, | |
} | |
} | |
log.Fatal("Invalid deployment") | |
return nil | |
} | |
func AirflowPodContainers(depType DeploymentType, tier AirflowTier, image ImageName, ports []int32) []v1.Container { | |
var containerPorts = []v1.ContainerPort{} | |
for _, port := range ports { | |
containerPorts = append(containerPorts, v1.ContainerPort{ | |
Name: InternalPortName(tier, port), | |
ContainerPort: port}) | |
} | |
otherPodContainers := []v1.Container{} | |
if tier == WorkerTier { | |
// Kubernetes api server proxy sidecar container | |
// https://github.com/kubernetes/kubernetes/tree/master/examples/kubectl-container/ | |
otherPodContainers = []v1.Container{v1.Container{ | |
Name: "kubectl", | |
Image: string(KubectlImage), | |
ImagePullPolicy: PullPolicy, | |
Args: []string{"kubectl", "proxy", "-p", strconv.Itoa(KubernetesApiPort)}, | |
}} | |
} | |
return append(otherPodContainers, v1.Container{ | |
Name: string(tier), | |
Image: string(image), | |
ImagePullPolicy: PullPolicy, | |
VolumeMounts: GetVolumeMounts(depType), | |
Env: AirflowPodEnv(tier), | |
Ports: containerPorts, | |
Args: AirflowTierArgs(tier)}) | |
} | |
func GuessHostDagPath() string { | |
path, err := filepath.Abs(filepath.Dir(os.Args[0])) | |
if err != nil { | |
log.Fatal(err) | |
} | |
for filepath.Base(path) != "understandwork" { | |
path = filepath.Dir(path) | |
} | |
return filepath.Join(path, "docker_airflow", "dags") | |
} | |
func GetPodVolumes(depType DeploymentType) []v1.Volume { | |
switch depType { | |
case ProdDeployment: | |
return []v1.Volume{} | |
case DevDeployment: | |
return []v1.Volume{ | |
v1.Volume{ | |
Name: DagMountName, | |
VolumeSource: v1.VolumeSource{ | |
HostPath: &v1.HostPathVolumeSource{ | |
Path: GuessHostDagPath(), | |
}, | |
}, | |
}, | |
v1.Volume{ | |
Name: ConfigMountName, | |
VolumeSource: v1.VolumeSource{ | |
HostPath: &v1.HostPathVolumeSource{ | |
Path: filepath.Join("/home", os.Getenv("USER"), ".config"), | |
}, | |
}, | |
}, | |
} | |
} | |
log.Fatal("Invalid deployment") | |
return nil | |
} | |
func AirflowDeployment(depType DeploymentType, tier AirflowTier, image ImageName, exposedPorts ...int32) *v1beta1.Deployment { | |
return &v1beta1.Deployment{ | |
ObjectMeta: v1.ObjectMeta{Name: string(tier)}, | |
Spec: v1beta1.DeploymentSpec{ | |
Template: v1.PodTemplateSpec{ | |
Spec: v1.PodSpec{ | |
RestartPolicy: v1.RestartPolicyAlways, | |
Volumes: GetPodVolumes(depType), | |
Containers: AirflowPodContainers(depType, tier, image, exposedPorts)}, | |
ObjectMeta: v1.ObjectMeta{ | |
Name: string(tier), | |
Labels: map[string]string{ | |
"app": AirflowName, | |
"tier": string(tier)}}}}, | |
TypeMeta: unversioned.TypeMeta{ | |
Kind: "Deployment", | |
APIVersion: "extensions/v1beta1"}} | |
} | |
func GetDeploymentType(arg string) DeploymentType { | |
switch arg { | |
case string(DevDeployment): | |
return DevDeployment | |
case string(ProdDeployment): | |
return ProdDeployment | |
} | |
log.Fatal("No deployment type " + arg) | |
panic("Impossibru") | |
} | |
func main() { | |
depType := GetDeploymentType(os.Args[1]) | |
objects := []runtime.Object{ | |
AirflowDeployment(depType, WebServerTier, AirflowImage, WebContainerPort), | |
AirflowDeployment(depType, FlowerTier, AirflowImage, FlowerContainerPort), | |
AirflowDeployment(depType, SchedulerTier, AirflowImage), | |
AirflowDeployment(depType, WorkerTier, AirflowImage, AirflowLogProxyPort), | |
AirflowDeployment(depType, PostgresTier, PostgresImage, PostgresPort), | |
AirflowDeployment(depType, RabbitMqTier, RabbitMqImage, RabbitMqNodePort, RabbitMqManagementPort), | |
AirflowPortBindService(v1.ServiceTypeClusterIP, PostgresTier, | |
PortBindRequest{ | |
FromPort: InternalPortName(PostgresTier, PostgresPort), | |
ToPort: PostgresPort}), | |
AirflowPortBindService(v1.ServiceTypeClusterIP, RabbitMqTier, | |
PortBindRequest{ | |
FromPort: InternalPortName(RabbitMqTier, RabbitMqNodePort), | |
ToPort: RabbitMqNodePort}, | |
PortBindRequest{ | |
FromPort: InternalPortName(RabbitMqTier, RabbitMqManagementPort), | |
ToPort: RabbitMqManagementPort}), | |
AirflowPortBindService(v1.ServiceTypeLoadBalancer, WebServerTier, | |
PortBindRequest{ | |
FromPort: InternalPortName(WebServerTier, WebContainerPort), | |
ToPort: HttpPort}), | |
AirflowPortBindService(v1.ServiceTypeLoadBalancer, FlowerTier, | |
PortBindRequest{ | |
FromPort: InternalPortName(FlowerTier, FlowerContainerPort), | |
ToPort: HttpPort}), | |
} | |
e := json.NewYAMLSerializer(json.DefaultMetaFactory, nil, nil) | |
for _, object := range objects { | |
err := e.Encode(object, os.Stdout) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Println("---") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment