Skip to content

Instantly share code, notes, and snippets.

@michalc
Last active September 29, 2021 14:44
Show Gist options
  • Save michalc/65f148c71b6bfefc508244cbe4150549 to your computer and use it in GitHub Desktop.
Save michalc/65f148c71b6bfefc508244cbe4150549 to your computer and use it in GitHub Desktop.
class MyPipeline(_PipelineV2):
# Everything is a method so nothing happens on import time for flexibility (although possibly
# does a bit of discovery magic... need to think about that...)
# Everything is a _static_ method: nothing on self since things are run on different bits of hardware,
# and gets any run-time dependencies injected in
#
# _PipelineV2 won't actually have any code: other parts of the system will interrogate its
# subclasses as needed. For example
# - Code in Data Flow would construct a DAG
# - The test harness would the run this and upstream pipelines synchronously
@staticmethod
def get_dependencies():
# So
# - data-flow will be able to work out this pipeline depends on this table
# - will pass a database connection with appropriate permissions to below functions
# - ensure in tests that the harness has some explicit data for this or runs
# previous pipelines
yield ('db-table', 'dit.my_grouping__my_table')
# So credentials can be passed to get_table_configs and get_data, and we also
# will be able to work out programatically what pipelines depend on what APIs
yield ('http-api', 'comtrade')
@staticmethod
def get_schedule(get_http_api_creds_and_client):
# Or return a cron-format schedule
# Or return a callable that would be polled to check if we should run it?
return 'after-upstream'
@staticmethod
def get_table_configs(get_db_connection, get_http_api_creds_and_client)
# Most pipelines will be hard coded... only a few get will use the db connection or
# api creds to generate the table configs dynamically
# Similar/same to current TableConfigs
yield ('dit.my_grouping__my_table', table_config_1)
yield ('dit.my_grouping__my_other_table', table_config_2)
@staticmethod
def get_data(get_db_connection, get_http_api_creds_and_client, table_configs):
# Does not know or care where the data is sent. Can query the database
# based on get_db_connection, or fetch credentials using get_api_creds
yield 'dit.my_grouping__my_table', ('some-id-1', 1, 5.3)
yield 'dit.my_grouping__my_table', ('some-id-2', 2, 5.3)
yield 'dit.my_grouping__my_other_table', ('another-id', 2, 5.37)
def test_my_pipeline(pipeline_runner):
# pipeline_runner is essentially a fancy pytest fixture that pretends to be data-flow. It:
# - populates a database with the mock data
# - discovers and runs all upstream pipelines (that don't have mock data configured),
# and could even run them connecting to the datasets db if run from inside
# Data Workspace
# - errors if there is something upstream without any mock data configured
# - errors if some config doesn't make sense (e.g. schedule is "after-upstream",
# but there is no upstream table)
# - configures requests mock with the mock http requests for API calls
# - and at the end, cleans up (hence being a pytest fixture: they're good for that)
data = pipeline_runner(MyPipeline, mock_dependencies=(
('table', 'dit.my_grouping__my_table', (
{'id': 3, 'col_b': 2},
{'id': 4, 'col_b': 6},
),
('http-api', 'comtrade', (
# Sets up mock requests or similar
('http://some-url/', 'GET', '{"some":"data"}'),
),
)
# Can then have lots of assertions on the data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment