Last active
February 10, 2022 11:35
-
-
Save artpel/abe4ef48b76abed1aead48aa28d15b96 to your computer and use it in GitHub Desktop.
Looker API Python Client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import looker_sdk | |
class LookerClient: | |
# This is the Looker client to communicate to Looker API with their SDK | |
# and fetch our content metadata | |
# We don't fetch all the data from the response as they have a **lot** of useless fields and | |
# it's heavily memory-consuming | |
def __init__(self): | |
os.environ["LOOKERSDK_BASE_URL"] = | |
os.environ["LOOKERSDK_CLIENT_ID"] = | |
os.environ["LOOKERSDK_CLIENT_SECRET"] = | |
self.sdk = looker_sdk.init31() | |
def search_user_by_email(self, email): | |
print(f"Finding Looker users with their email: {email}") | |
users = self.sdk.search_users(email) | |
return users | |
def search_user_by_first_name(self, first_name: str): | |
print(f"Finding Looker users with their first_name: {first_name}") | |
users = self.sdk.search_users(first_name=first_name) | |
return users | |
def delete_user(self, user_id: str): | |
self.sdk.delete_user(user_id) | |
return | |
def get_user(self, user_id): | |
print(f"Getting Looker users with their ID: {user_id}") | |
user = self.sdk.user(user_id) | |
return user | |
def update_user(self, user_id, body): | |
print(f"Getting Looker users with this parameters: {body}") | |
updated_user = self.sdk.update_user(user_id, body) | |
return updated_user | |
def get_model(self, model): | |
print(f"Getting model {model}") | |
model = self.sdk.lookml_model(model) | |
return model | |
def get_explore_details(self, explore_name, model="swile-snowflake"): | |
print(f"Getting explore {explore_name} in model {model}") | |
explore = self.sdk.lookml_model_explore(model, explore_name) | |
return { | |
"id": explore["id"], | |
"name": explore["name"], | |
"description": explore["description"], | |
"source_file": explore["source_file"], | |
"model_name": explore["model_name"], | |
"base_view_name": explore["view_name"], | |
"sql_table_name": explore["sql_table_name"], | |
"scopes": explore["scopes"], | |
} | |
def get_fields_metadata_from_explore(self, explore_name, model="swile-snowflake"): | |
print(f"Getting fields from explore {explore_name} in model {model}") | |
explore = self.sdk.lookml_model_explore(model, explore_name) | |
dimensions = explore["fields"]["dimensions"] | |
measures = explore["fields"]["measures"] | |
lookml_fields = dimensions + measures | |
cleaned_lookml_fields = [] | |
for field in lookml_fields: | |
cleaned_lookml_fields.append( | |
{ | |
"explore": explore_name, | |
"view": field["view"], | |
"name": field["name"], | |
"label": field["label"], | |
"description": field["description"], | |
"hidden": field["hidden"], | |
"type": field["type"], | |
"primary_key": field["primary_key"], | |
"sql": field["sql"], | |
"lookml_field_type": self._determine_lookml_field_type(field["dimension_group"], field["measure"]), | |
"lookml_link": field["lookml_link"], | |
} | |
) | |
return cleaned_lookml_fields | |
def get_all_dashboard_ids(self): | |
print("Getting dashboard IDs for all dashboards") | |
all_dashboards = self.sdk.all_dashboards() | |
dashboard_ids = [] | |
for dashboard in all_dashboards: | |
if not dashboard["model"]: # to remove custom Dashboards not on swile-snowflake model | |
dashboard_ids.append(dashboard["id"]) | |
return dashboard_ids | |
def get_dashboard_metadata(self, dashboard_id): | |
dashboard_metadatas = self.sdk.dashboard(dashboard_id) | |
return { | |
"id": dashboard_metadatas["id"], | |
"title": dashboard_metadatas["title"], | |
"user_id": dashboard_metadatas["user_id"], | |
"folder": self._return_folder_data(dashboard_metadatas["folder"]), | |
"last_viewed_at": dashboard_metadatas["last_viewed_at"], | |
"deleted": dashboard_metadatas["deleted"], | |
} | |
def get_look(self, look_id): | |
print(f"Getting metadata from Look {look_id}") | |
look_metadatas = self.sdk.look(look_id) | |
return { | |
"id": look_metadatas["id"], | |
"title": look_metadatas["title"], | |
"created_at": look_metadatas["created_at"], | |
"deleted_at": look_metadatas["deleted_at"], | |
"last_accessed_at": look_metadatas["last_accessed_at"], | |
"query_id": look_metadatas["query_id"], | |
"user_id": look_metadatas["user_id"], | |
"folder": self._return_folder_data(look_metadatas["folder"]), | |
} | |
def get_query(self, query_id): | |
print(f"Getting metadata from Looker query {query_id}") | |
query_metadata = self.sdk.query(query_id) | |
return { | |
"id": query_metadata["id"], | |
"view": query_metadata["view"], | |
"fields": query_metadata["fields"], | |
"pivots": query_metadata["pivots"], | |
"filters": dict(query_metadata["filters"]) if query_metadata["filters"] else None, | |
"sorts": query_metadata["sorts"], | |
"limit": query_metadata["limit"], | |
"column_limit": query_metadata["column_limit"], | |
"share_url": query_metadata["share_url"], | |
} | |
def get_dashboard_visualization_elements(self, dashboard_id): | |
print(f"Getting dashboard visualization elements from Looker dashboard {dashboard_id}") | |
dashboard_metadatas = self.sdk.dashboard(dashboard_id) | |
dashboard_elements = dashboard_metadatas["dashboard_elements"] | |
dashboard_visualization_elements = [] | |
for element in dashboard_elements: | |
if element["type"] == "vis": | |
dashboard_visualization_elements.append( | |
{ | |
"id": element["id"], | |
"title": element["title"], | |
"note_text": element["note_text"], | |
"dashboard_id": element["dashboard_id"], | |
"query_id": element["query_id"], | |
"look_id": element["look_id"], | |
} | |
) | |
return dashboard_visualization_elements | |
def get_dashboard_filter_elements(self, dashboard_id): | |
print(f"Getting dashboard filter elements from Looker dashboard {dashboard_id}") | |
dashboard_metadatas = self.sdk.dashboard(dashboard_id) | |
dashboard_filters = dashboard_metadatas["dashboard_filters"] | |
dashboard_filter_elements = [] | |
for element in dashboard_filters: | |
dashboard_filter_elements.append( | |
{ | |
"id": element["id"], | |
"dashboard_id": element["dashboard_id"], | |
"name": element["name"], | |
"title": element["title"], | |
"type": element["type"], | |
"explore": element["explore"], | |
"dimension": element["dimension"], | |
"allow_multiple_values": element["allow_multiple_values"], | |
"required": element["required"], | |
"default_value": element["default_value"], | |
} | |
) | |
return dashboard_filter_elements | |
def get_all_looks(self): | |
print(f"Getting all looks") | |
cleaned_looks = [] | |
all_looks = self.sdk.all_looks() | |
for look in all_looks: | |
cleaned_looks.append( | |
{ | |
"id": look["id"], | |
"title": look["title"], | |
"created_at": look["created_at"], | |
"deleted_at": look["deleted_at"], | |
"last_accessed_at": look["last_accessed_at"], | |
"query_id": look["query_id"], | |
"user_id": look["user_id"], | |
"folder": self._return_folder_data(look["folder"]), | |
} | |
) | |
return cleaned_looks | |
def create_query(self, explore: str, fields: list, model="swile-snowflake", limit=0): | |
print(f"Creating query for fields {fields} in explore {explore}") | |
query_builder = {"model": model, "view": explore, "fields": fields, "limit": limit} | |
query_metadata = self.sdk.create_query(query_builder) | |
return query_metadata | |
def run_query(self, query_id, result_type="json"): | |
print(f"Running query {query_id}") | |
query_result = self.sdk.run_query(query_id, result_type) | |
return json.loads(query_result) | |
def get_content_errors_from_content_validator(self): | |
clean_errors = [] | |
content_in_error = self.sdk.content_validation() | |
for content in content_in_error["content_with_errors"]: | |
errors = content["errors"] | |
for error in errors: | |
look_in_error = content["look"] | |
dashboard_element_in_error = content["dashboard_element"] | |
dashboard_filter_in_error = content["dashboard_filter"] | |
clean_errors.append( | |
{ | |
"id": content["id"], | |
"look_id": look_in_error["id"] if look_in_error else None, | |
"dashboard_id": dashboard_element_in_error["dashboard_id"] | |
if dashboard_element_in_error | |
else dashboard_filter_in_error["dashboard_id"] | |
if dashboard_filter_in_error | |
else None, | |
"query_id": dashboard_element_in_error["query_id"] if dashboard_element_in_error else None, | |
"message": error["message"], | |
"field_name": error["field_name"], | |
"model_name": errors[0][ | |
"model_name" | |
], # Weird, but Looker API only set model_name and explore_name for first error | |
"explore_name": errors[0]["explore_name"], # Same as above | |
"error_location": self._return_content_error_location( | |
look_in_error, dashboard_element_in_error, dashboard_filter_in_error | |
), | |
} | |
) | |
return clean_errors | |
def _return_folder_data(self, folder): | |
if folder: | |
return {"folder_id": folder["id"], "in_personal_folder": folder["is_personal"]} | |
else: | |
return {} | |
def _return_content_error_location(self, look, dashboard_element, dashboard_filter): | |
if look: | |
return "look" | |
elif dashboard_element: | |
return "dashboard_element" | |
elif dashboard_filter: | |
return "dashboard_filter" | |
else: | |
None | |
def _determine_lookml_field_type(self, dimension_group, measure): | |
if dimension_group: | |
return "dimension_group" | |
elif measure: | |
return "measure" | |
else: | |
return "dimension" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment