Created
June 29, 2021 21:30
-
-
Save jerdog/fb00de499b125fec621ff4d574261360 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Connect to Orbit API using this module | |
""" | |
from invoke import task | |
from product_el.data_utils import utils # type: ignore | |
from product_el.snowflake import snow_connect # type: ignore | |
config = utils.load_config() | |
def get_orbit_data(endpoint, workspace_id, page=1, items=250): | |
""" | |
returns output list of response from get request | |
""" | |
base_url = "https://app.orbit.love/api/v1/" | |
get_url = ( | |
base_url + workspace_id + "/" + endpoint | |
if endpoint != "activity_types" | |
else base_url + endpoint | |
) | |
headers = { | |
"Accept": "application/json", | |
"Authorization": "Bearer " + utils.get_api_key("orbit"), | |
} | |
params = {"page": page, "items": items} | |
# loop strategy is run the first page, get the number of pages, then | |
# run that many pages | |
page_limit = 1 | |
output = [] | |
# this loop will run for each page until the last one, | |
# where the page number = the last page because there is no next link | |
while page <= page_limit: | |
print("Processing Orbit " + endpoint + " page " + str(page)) | |
results = utils.get_json(get_url, headers=headers, params=params) | |
try: | |
output.append(results.copy()) | |
except AttributeError as err: | |
print(f"No data returned for endpoint {endpoint}") | |
print(err) | |
# increment the page to query | |
page += 1 | |
try: | |
# if there is a next page, increment the page limit | |
next_page = results["links"]["next"] | |
if next_page is not None: | |
page_limit += 1 | |
except KeyError: | |
pass | |
params = {"page": page, "items": items} | |
return output | |
def data_package(endpoint): | |
""" | |
This creates the data package for loading | |
""" | |
workspace_id = config["orbit"]["workspace_id"] | |
data = get_orbit_data(endpoint=endpoint, workspace_id=workspace_id) | |
wrapped_package = {"data": data, "service": "orbit", "endpoint": endpoint} | |
return wrapped_package | |
@task(default=True) | |
def load_orbit(ctx): # pylint: disable=unused-argument | |
""" | |
loads orbit endpoints into snowflake | |
""" | |
for endpoint in config["orbit"]["endpoints"]: | |
snow_connect.load_data(data_package(endpoint), utils.snowflake_config()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment