Last active
March 5, 2024 16:21
-
-
Save stevewithington/6afb0ec5dcba6b58c3e3ede5e38cd3ae to your computer and use it in GitHub Desktop.
Refresh Power BI / Fabric with Python : Option 2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Tools used to get workspace, dataset names and refresh datasets. | |
Power BI tools to refresh a dataset using client secret authentication. | |
Sample use (client id and secret values directly): | |
tenant_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" | |
app_client_id = "XXXXXXXX-xxxx-xxxx-xxxx-xxxxxxxxxxxx" | |
app_client_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" | |
app_scope = "https://analysis.windows.net/powerbi/api" | |
auth_token = AuthToken(tenant_id, app_client_id, app_app_client_secret, app_scope) | |
workspace_name = "YourWorkspaceName" | |
pbi_refresh = PowerBIRefresh(workspace_name, auth_token) | |
dataset_name = "YourDatasetName" | |
pbi_refresh.refresh(dataset_name) | |
Sample use (keyvault): | |
tenant_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" | |
app_client_id_secretname = "your-app-client-id" | |
app_client_secret_secretname = "your-app-client-secret" | |
auth_token = AuthToken(tenant_id, app_client_id_secretname, app_client_secret_secretname) | |
workspace_name = "YourWorkspaceName" | |
pbi_refresh = PowerBIRefresh(workspace_name, auth_token) | |
dataset_name = "YourDatasetName" | |
pbi_refresh.refresh(dataset_name) | |
Kudos: <https://github.com/ProdataSQL/Fabric/blob/main/06_RefreshPowerBIDataset/pbi_refresh.py> | |
""" | |
import time | |
from os.path import join | |
import requests | |
try: | |
from notebookutils import mssparkutils | |
USE_MSSPARKUTILS = True | |
except ModuleNotFoundError: | |
USE_MSSPARKUTILS = False | |
# while this can be in the except statement - importing modules in an except | |
# confuses linters -_- | |
if not USE_MSSPARKUTILS: | |
from azure.keyvault.secrets import SecretClient | |
from azure.identity import DefaultAzureCredential | |
READ_STATUS_TIMER = 5 | |
REST_TIMEOUT = 10 | |
class AuthToken: | |
"""Class to retrieve token from tenant id, client id, seceret and scope. | |
:param str token_url: https://login.microsoftonline.com/ | |
:param str tenant_id: | |
:param str app_client_id: The id or keyvault secret name (if keyvault URL provided). | |
:param str app_client_secret: The id or keyvault secret name (if keyvault URL provided). | |
:param str scope: https://analysis.windows.net/powerbi/api | |
:param str keyvault_url: https://{key-vault-name}.vault.azure.net/ or {key-vault-name}. | |
""" | |
def __init__( | |
self, | |
tenant_id, | |
app_client_id, | |
app_client_secret, | |
scope="https://analysis.windows.net/powerbi/api", | |
keyvault: str or None = None, | |
token_url: str or None = "https://login.microsoftonline.com/", | |
): | |
self.access_token = None | |
# Set the Token URL for Azure AD Endpoint | |
if token_url is not None: | |
self.token_url = f"{token_url}{tenant_id}/oauth2/token" | |
else: | |
self.token_url = ( | |
f"https://login.microsoftonline.com/{tenant_id}/oauth2/token" | |
) | |
if keyvault: | |
if not keyvault.startswith("https://"): | |
keyvault = f"https://{keyvault}.vault.azure.net/" | |
if USE_MSSPARKUTILS: | |
app_client_id = mssparkutils.credentials.getSecret( | |
keyvault, app_client_id | |
) | |
app_client_secret = mssparkutils.credentials.getSecret( | |
keyvault, app_client_secret | |
) | |
else: | |
secret_client = SecretClient( | |
vault_url=keyvault, credential=DefaultAzureCredential() | |
) | |
app_client_id = secret_client.get_secret(app_client_id).value | |
app_client_secret = secret_client.get_secret(app_client_secret).value | |
self.set_token(app_client_id, app_client_secret, scope) | |
def set_token(self, client_id, client_secret, scope): | |
"""Sets the classes token value. | |
:param str client_id: | |
:param str client_secret: | |
:param str scope: | |
:return: None | |
""" | |
data = { | |
"grant_type": "client_credentials", | |
"client_id": client_id, | |
"client_secret": client_secret, | |
"resource": scope, | |
} | |
# Send POS request to obtain access token | |
response = requests.post(self.token_url, data=data, timeout=REST_TIMEOUT) | |
response.raise_for_status() | |
token_data = response.json() | |
self.access_token = token_data["access_token"] | |
class PowerBIRefresh: | |
"""Class of tools to handle power BI refreshing. | |
:param str base_url: https://api.powerbi.com/v1.0/myorg/ | |
:param AuthToken auth_token: | |
:param workspace_name: | |
""" | |
def __init__( | |
self, | |
workspace_name, | |
auth_token: AuthToken or str, | |
base_url: str or None = "https://api.powerbi.com/v1.0/myorg/", | |
): | |
if isinstance(auth_token,str): | |
self.headers = {"Authorization": f"Bearer {auth_token}"} | |
elif isinstance(auth_token,AuthToken): | |
self.headers = {"Authorization": f"Bearer {auth_token.access_token}"} | |
self.base_url = base_url | |
if isinstance(workspace_name, str): | |
self.workspace_id = self.get_workspace_id(workspace_name) | |
elif isinstance(workspace_name, list): | |
self.workspace_id = self.get_workspace_id(workspace_name[0]) | |
def get_workspace_id(self, workspace_name) -> str: | |
"""Returns a workspace name. | |
:param str workspace_name: | |
:raises WorkspaceNameNotFoundException: | |
:return: Id of workspace. | |
:rtype: str | |
""" | |
relative_url = join(self.base_url, "groups") | |
response = requests.get( | |
relative_url, headers=self.headers, timeout=REST_TIMEOUT | |
) | |
if not response.ok: | |
response.raise_for_status() | |
workspaces = response.json()["value"] | |
for workspace in workspaces: | |
if workspace["name"] == workspace_name: | |
self.workspace_id = workspace["id"] | |
return self.workspace_id | |
raise WorkspaceNameNotFoundException(workspace_name) | |
def get_dataset_ids(self, dataset_names, workspace_id=None) -> list: | |
"""Returns a list of dataset ids from a list of dataset names. | |
:param list(str) dataset_names: | |
:param workspace_id: | |
:type workspace_id: str or None | |
:raises DatasetNameNotFoundException: | |
:return: list of dataset ids | |
:rtype: list | |
""" | |
workspace_id = self.workspace_id if workspace_id is None else workspace_id | |
relative_url = join(self.base_url, f"groups/{workspace_id}/datasets") | |
# Set the GET response using the relative URL | |
response = requests.get( | |
relative_url, headers=self.headers, timeout=REST_TIMEOUT | |
) | |
if not response.ok: | |
response.raise_for_status() | |
dataset_ids = [] | |
datasets = response.json()["value"] | |
for dataset in datasets: | |
for dataset_name in dataset_names: | |
if dataset["name"] == dataset_name and dataset["isRefreshable"] is True: | |
dataset_ids.append(dataset["id"]) | |
return dataset_ids | |
raise DatasetNameNotFoundException(dataset_names) | |
def get_dataset_name(self, dataset_id, workspace_id=None) -> str: | |
"""Returns a datasetname from its id. | |
:param str dataset_id: | |
:param workspace_id: | |
:type workspace_id: str or None | |
:raises DatasetNameNotFoundException: | |
:return: dataset id | |
:rtype: str | |
""" | |
workspace_id = self.workspace_id if workspace_id is None else workspace_id | |
relative_url = join(self.base_url, f"groups/{workspace_id}/datasets") | |
response = requests.get( | |
relative_url, headers=self.headers, timeout=REST_TIMEOUT | |
) | |
if not response.ok: | |
response.raise_for_status() | |
datasets = response.json()["value"] | |
for dataset in datasets: | |
if dataset["id"] != dataset_id: | |
pass | |
if dataset["isRefreshable"] is True: | |
return dataset["name"] | |
raise DatasetNameNotFoundException(dataset_id) | |
def refresh_dataset(self, dataset_id, workspace_id=None): | |
"""Refreshes a dataset by id. | |
:param str dataset_id: | |
:param workspace_id: | |
:type workspace_id: str or None | |
:raises DatasetRefreshFailedException: | |
:return: None | |
:rtype: None | |
""" | |
workspace_id = self.workspace_id if workspace_id is None else workspace_id | |
relative_url = join( | |
self.base_url, f"groups/{workspace_id}/datasets/{dataset_id}/refreshes" | |
) | |
response = requests.post( | |
relative_url, headers=self.headers, timeout=REST_TIMEOUT | |
) | |
if response.ok: | |
error_counter = 0 | |
error_limit = 5 | |
status = None | |
print( | |
f"Dataset {self.get_dataset_name(dataset_id, workspace_id)} refresh has been triggered successfully." | |
) | |
while error_counter < error_limit or status == "Unknown": | |
try: | |
status = self.get_dataset_refresh_status(dataset_id, workspace_id) | |
except requests.HTTPError: | |
error_counter += 1 | |
time.sleep(READ_STATUS_TIMER) | |
continue | |
if status == "Failed": | |
raise DatasetRefreshFailedException(self, workspace_id, dataset_id) | |
if status == "Completed": | |
error_counter = 0 | |
break | |
if error_counter > error_limit: | |
raise FailedToGetStatusException( | |
workspace_id, dataset_id, error_counter | |
) | |
else: | |
print( | |
f"Failed to trigger dataset" | |
f"{self.get_dataset_name(dataset_id, workspace_id)} refresh." | |
) | |
print("Response status code:", response.status_code) | |
print("Response content:", response.content) | |
response.raise_for_status() | |
raise DatasetRefreshFailedException(self, workspace_id, dataset_id) | |
def get_dataset_refresh_status(self, dataset_id, workspace_id=None) -> str: | |
"""Gets the refresh status of a dataset by its dataset id. | |
:param str dataset_id: | |
:param workspace_id: | |
:type workspace_id: str or None | |
:return: The status of dataset refresh (current or previous). | |
:rtype: str | |
""" | |
workspace_id = self.workspace_id if workspace_id is None else workspace_id | |
relative_url = join( | |
self.base_url, | |
f"groups/{workspace_id}/datasets/{dataset_id}/refreshes?$top=1", | |
) | |
response = requests.get( | |
relative_url, headers=self.headers, timeout=REST_TIMEOUT | |
) | |
response.raise_for_status() | |
refresh_status = response.json()["value"] | |
status = refresh_status[0]["status"] | |
return status | |
def refresh( | |
self, | |
dataset_names: str or list(str), | |
workspace_names: str or list(str) or None = None, | |
): | |
"""Invokes refresh of PowerBI Dataset, can be a list of workspaces and datasets or just one. | |
:param workspace_names: List or comma seperated string of workspace names. | |
:type workspace_names: str or list(str) or None | |
:param dataset_names: List or comma seperated string of dataset names. | |
:type dataset_names: str or list(str) | |
:return: None. | |
:rtype: None | |
""" | |
if isinstance(workspace_names, list): | |
workspace_list = workspace_names | |
elif workspace_names is None: | |
workspace_list = [self.workspace_id] | |
else: | |
workspace_list = workspace_names.split(",") | |
if dataset_names is None: | |
raise DatasetNameBlankException() | |
else: | |
if isinstance(dataset_names, list): | |
dataset_list = dataset_names | |
else: | |
dataset_list = dataset_names.split(",") | |
for workspace_name in workspace_list: | |
workspace_id = ( | |
self.workspace_id | |
if workspace_name == self.workspace_id | |
else self.get_workspace_id(workspace_name) | |
) | |
dataset_ids = self.get_dataset_ids(dataset_list, workspace_id) | |
for dataset_id in dataset_ids: | |
self.refresh_dataset(dataset_id, workspace_id) | |
class WorkspaceNameNotFoundException(Exception): | |
"""Workspace name not found runtime exception.""" | |
def __init__(self, workspace_name): | |
message = f"workspace {workspace_name} Not Found" | |
super().__init__(message) | |
class DatasetNameNotFoundException(Exception): | |
"""Dataset name not found runtime exception.""" | |
def __init__(self, dataset_name): | |
message = f"Dataset Name {dataset_name} Not Found" | |
super().__init__(message) | |
class DatasetNameBlankException(Exception): | |
"""Dataset name was blank.""" | |
def __init__(self): | |
message = "Dataset Name cannot be blank" | |
super().__init__(message) | |
class DatasetRefreshFailedException(Exception): | |
"""Refresh of dataset was not successful""" | |
def __init__(self, pbi_refr_tools, workspace_id, dataset_id): | |
workspace_name = "" | |
message = ( | |
f"Dataset {pbi_refr_tools.get_pbi_dataset_name(dataset_id)} ({dataset_id})" | |
+ f"in workspace {workspace_name} ({workspace_id}) failed to refresh." | |
) | |
super().__init__(message) | |
class FailedToGetStatusException(Exception): | |
"""Failed to get status during refresh""" | |
def __init__(self, workspace, dataset, retries): | |
message = f"Dataset {dataset} in {workspace} failed to get status, after {retries} retries." | |
super().__init__(message) | |
if __name__ == "__main__": | |
TENANT_ID = ( | |
"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxxx" # replace with your azure tenant id | |
) | |
APP_CLIENT_ID = "<ClientIDName>" | |
APP_CLIENT_SECRET = "<ClientIDSecret>" | |
AUTH_TOKEN = AuthToken( | |
TENANT_ID, APP_CLIENT_ID, APP_CLIENT_SECRET, keyvault="<KeyVaultName>" | |
) | |
WORKSPACE_NAME = "<WorkspaceName>" # choose whichever workspace is applicable | |
PBI_REFRESH = PowerBIRefresh(WORKSPACE_NAME, AUTH_TOKEN) | |
DATASET_NAME = "<DatasetName>" | |
PBI_REFRESH.refresh(DATASET_NAME) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment