Last active
September 29, 2021 14:44
-
-
Save michalc/65f148c71b6bfefc508244cbe4150549 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyPipeline(_PipelineV2): | |
# Everything is a method so nothing happens on import time for flexibility (although possibly | |
# does a bit of discovery magic... need to think about that...) | |
# Everything is a _static_ method: nothing on self since things are run on different bits of hardware, | |
# and gets any run-time dependencies injected in | |
# | |
# _PipelineV2 won't actually have any code: other parts of the system will interrogate its | |
# subclasses as needed. For example | |
# - Code in Data Flow would construct a DAG | |
# - The test harness would the run this and upstream pipelines synchronously | |
@staticmethod | |
def get_dependencies(): | |
# So | |
# - data-flow will be able to work out this pipeline depends on this table | |
# - will pass a database connection with appropriate permissions to below functions | |
# - ensure in tests that the harness has some explicit data for this or runs | |
# previous pipelines | |
yield ('db-table', 'dit.my_grouping__my_table') | |
# So credentials can be passed to get_table_configs and get_data, and we also | |
# will be able to work out programatically what pipelines depend on what APIs | |
yield ('http-api', 'comtrade') | |
@staticmethod | |
def get_schedule(get_http_api_creds_and_client): | |
# Or return a cron-format schedule | |
# Or return a callable that would be polled to check if we should run it? | |
return 'after-upstream' | |
@staticmethod | |
def get_table_configs(get_db_connection, get_http_api_creds_and_client) | |
# Most pipelines will be hard coded... only a few get will use the db connection or | |
# api creds to generate the table configs dynamically | |
# Similar/same to current TableConfigs | |
yield ('dit.my_grouping__my_table', table_config_1) | |
yield ('dit.my_grouping__my_other_table', table_config_2) | |
@staticmethod | |
def get_data(get_db_connection, get_http_api_creds_and_client, table_configs): | |
# Does not know or care where the data is sent. Can query the database | |
# based on get_db_connection, or fetch credentials using get_api_creds | |
yield 'dit.my_grouping__my_table', ('some-id-1', 1, 5.3) | |
yield 'dit.my_grouping__my_table', ('some-id-2', 2, 5.3) | |
yield 'dit.my_grouping__my_other_table', ('another-id', 2, 5.37) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_my_pipeline(pipeline_runner): | |
# pipeline_runner is essentially a fancy pytest fixture that pretends to be data-flow. It: | |
# - populates a database with the mock data | |
# - discovers and runs all upstream pipelines (that don't have mock data configured), | |
# and could even run them connecting to the datasets db if run from inside | |
# Data Workspace | |
# - errors if there is something upstream without any mock data configured | |
# - errors if some config doesn't make sense (e.g. schedule is "after-upstream", | |
# but there is no upstream table) | |
# - configures requests mock with the mock http requests for API calls | |
# - and at the end, cleans up (hence being a pytest fixture: they're good for that) | |
data = pipeline_runner(MyPipeline, mock_dependencies=( | |
('table', 'dit.my_grouping__my_table', ( | |
{'id': 3, 'col_b': 2}, | |
{'id': 4, 'col_b': 6}, | |
), | |
('http-api', 'comtrade', ( | |
# Sets up mock requests or similar | |
('http://some-url/', 'GET', '{"some":"data"}'), | |
), | |
) | |
# Can then have lots of assertions on the data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment