Skip to content

Instantly share code, notes, and snippets.

@adilkhash
Last active March 14, 2021 17:39
Show Gist options
  • Save adilkhash/7ac372554a33867effc8cc900b9a56d7 to your computer and use it in GitHub Desktop.
Save adilkhash/7ac372554a33867effc8cc900b9a56d7 to your computer and use it in GitHub Desktop.
Apache Airflow Rows Affected Catcher
import logging
from logging import Handler, LogRecord
from airflow.operators.python import get_current_context
class RowsAffectedHandler(Handler):
def emit(self, record: LogRecord) -> None:
msg = self.format(record)
context = get_current_context()
result = re.search(r'Rows affected: (?P<rowscount>\d+)$', msg)
if result:
rows = result.groupdict().get('rowscount')
context['ti'].xcom_push(value=f'{rows}', key='rows_affected')
handler = RowsAffectedHandler()
handler.addFilter(lambda record: record.getMessage().find('Rows affected') != -1)
logging.getLogger('airflow').addHandler(handler)
# ниже код вашего DAG
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment