Skip to content

Instantly share code, notes, and snippets.

@fpopic
Last active May 19, 2021 19:44
Show Gist options
  • Save fpopic/35807489d9c36c7ea8bc4de6f966b5fc to your computer and use it in GitHub Desktop.
Save fpopic/35807489d9c36c7ea8bc4de6f966b5fc to your computer and use it in GitHub Desktop.
import logging
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class SelectPostgresOperator(BaseOperator):
"""
Executes sql code in a specific Postgres database
and returns table result as a pandas dataframe
:param postgres_conn_id: reference to a specific postgres database
:type postgres_conn_id: string
:param sql: the sql code to be executed
:type sql: Can receive a str representing a sql statement,
a list of str (sql statements), or reference to a template file.
Template reference are recognized by str ending in '.sql'
"""
template_fields = ('sql',)
template_ext = ('.sql',)
ui_color = '#ededed'
@apply_defaults
def __init__(
self, sql,
postgres_conn_id='select_postgres_default', autocommit=False,
parameters=None,
*args, **kwargs):
super(SelectPostgresOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.postgres_conn_id = postgres_conn_id
self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
self.autocommit = autocommit
self.parameters = parameters
def execute(self, context):
logging.info("Executing: " + str(self.sql))
df = self.hook.get_pandas_df(self.sql, parameters=self.parameters)
logging.info("Result count: {0}".format(len(df)))
return df
@fpopic
Copy link
Author

fpopic commented May 19, 2021

@fpopic is this code finally worked for you? Have you updated the correct version? I have exactly the same issue..

I think it did. You can see in revisions https://gist.github.com/fpopic/35807489d9c36c7ea8bc4de6f966b5fc/revisions that I swapped 32 and 34 lines. :) Didn't use for a while so don't know what's the state today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment