Skip to content

Instantly share code, notes, and snippets.

@y2k-shubham
Created December 17, 2018 07:10
Show Gist options
  • Save y2k-shubham/275d9679ba3dd1e99f3e0ad401f69920 to your computer and use it in GitHub Desktop.
Save y2k-shubham/275d9679ba3dd1e99f3e0ad401f69920 to your computer and use it in GitHub Desktop.
Code snippet to create Airflow Pool
from airflow.settings import Session
from airflow.utils.db import provide_session
from airflow.models import Pool
# hive_pool is just an example, you might want to create some other types of pools such as for MySQL
@provide_session
def create_hive_pool(session: Optional[Session] = None) -> None:
pool = Pool(pool=pool_templates['hive_name'],
slots=1,
description=pool_templates['hive_description'])
session.add(pool)
@y2k-shubham
Copy link
Author

Note: This is just a possible pathway, I have NOT tested it

Please go through following links for possible downsides

@y2k-shubham
Copy link
Author

Don't know why this is marked as experimental

@y2k-shubham
Copy link
Author

i've come up with this (again not tested)

from typing import Optional

from airflow.models import Connection, Variable
from airflow.settings import Session
from airflow.utils.db import provide_session


@provide_session
def create_connection(conn_id: str,
                      conn_type: str,
                      host: str,
                      port: Optional[int] = None,
                      user: Optional[str] = None,
                      password: Optional[str] = None,
                      schema: Optional[str] = None,
                      extra: Optional[str] = None,
                      session: Optional[Session] = None) -> None:
    conn: Connection = Connection(conn_id=conn_id,
                                  conn_type=conn_type,
                                  host=host,
                                  port=port,
                                  login=user,
                                  password=password,
                                  schema=schema,
                                  extra=extra)
    session.add(conn)
    session.flush()


@provide_session
def delete_connection(conn_id: str,
                      session: Optional[Session] = None) -> None:
    conn: Connection = session \
        .query(Connection) \
        .filter(Connection.conn_id == conn_id) \
        .one()
    session.delete(conn)
    session.flush()


def create_variable(key: str,
                    value: str,
                    serialize_json: bool = False) -> None:
    Variable.set(key=key,
                 value=value,
                 serialize_json=serialize_json)


@provide_session
def delete_variable(key: str, session: Optional[Session] = None) -> None:
    var: Variable = session \
        .query(Variable) \
        .filter(Variable.key == key) \
        .one()
    session.delete(var)
    session.flush()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment