Last active
May 18, 2021 19:00
-
-
Save Dineshkarthik/10d488897b66c81d1e1890410bbfb782 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# Copyright 2019 Dineshkarthik Raveendran | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# Last Tested --> 2021-05-18 | |
# apache-airflow = "1.10.14" | |
# snowflake-sqlalchemy = "1.2.4" | |
"""Example DAG demonstrating the usage of the SnowflakeOperator & Hook.""" | |
import logging | |
import airflow | |
from airflow import DAG | |
from airflow.operators.python_operator import PythonOperator | |
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook | |
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
args = {"owner": "Airflow", "start_date": airflow.utils.dates.days_ago(2)} | |
dag = DAG( | |
dag_id="snowflake_connector", default_args=args, schedule_interval=None | |
) | |
create_insert_query = [ | |
"""create table public.test_table (amount number);""", | |
"""insert into public.test_table values(1),(2),(3);""", | |
] | |
def row_count(**context): | |
dwh_hook = SnowflakeHook(snowflake_conn_id="snowflake_conn") | |
result = dwh_hook.get_first("select count(*) from public.test_table") | |
logging.info("Number of rows in `public.test_table` - %s", result[0]) | |
with dag: | |
create_insert = SnowflakeOperator( | |
task_id="snowfalke_create", | |
sql=create_insert_query, | |
snowflake_conn_id="snowflake_conn", | |
) | |
get_count = PythonOperator(task_id="get_count", python_callable=row_count) | |
create_insert >> get_count |
@shilpigu02 for more details read this article - https://itnext.io/connect-apache-airflow-to-snowflake-data-warehouse-37936a9edfa1
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Dinesh,
I am new to snowfalke and Apache-Airflow, Could you pls guide me the beginning steps, where i need to copy this code also how this web page come--url for this ?