Forked from mdrakiburrahman/get-schema-purview.py
Last active
January 13, 2021 16:10
-
-
Save wjohnson/20733fe4f0f790b2125ebdc79af2d054 to your computer and use it in GitHub Desktop.
Extracting metadata from Azure Purview with Synapse Spark Pools
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Reusable Functions | |
def azuread_auth(tenant_id: str, client_id: str, client_secret: str, resource_url: str): | |
""" | |
Authenticates Service Principal to the provided Resource URL, and returns the OAuth Access Token | |
""" | |
url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token" | |
payload= f'grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&resource={resource_url}' | |
headers = { | |
'Content-Type': 'application/x-www-form-urlencoded' | |
} | |
response = requests.request("POST", url, headers=headers, data=payload) | |
access_token = json.loads(response.text)['access_token'] | |
return access_token | |
def purview_auth(tenant_id: str, client_id: str, client_secret: str, data_catalog_name: str): | |
""" | |
Authenticates to Atlas Endpoint and returns a client object | |
""" | |
oauth = ServicePrincipalAuthentication( | |
tenant_id = tenant_id, | |
client_id = client_id, | |
client_secret = client_secret | |
) | |
client = PurviewClient( | |
account_name = data_catalog_name, | |
authentication = oauth | |
) | |
return client | |
def get_all_adls_assets(path: str, data_catalog_name: str, azuread_access_token: str, max_depth=1): | |
""" | |
Retrieves all scanned assets for the specified ADLS Storage Account Container. | |
Note: this function intentionally recursively traverses until only assets remain (i.e. no folders are returned, only files). | |
""" | |
# List all files in path | |
url = f"https://{data_catalog_name}.catalog.purview.azure.com/api/browse" | |
headers = { | |
'Authorization': f'Bearer {azuread_access_token}', | |
'Content-Type': 'application/json' | |
} | |
payload="""{"limit": 100, | |
"offset": null, | |
"path": "%s" | |
}""" % (path) | |
response = requests.request("POST", url, headers=headers, data=payload) | |
li = json.loads(response.text) | |
# Return all files | |
for x in jmespath.search("value", li): | |
if jmespath.search("isLeaf", x): | |
yield x | |
# If the max_depth has not been reached, start | |
# listing files and folders in subdirectories | |
if max_depth > 1: | |
for x in jmespath.search("value", li): | |
if jmespath.search("isLeaf", x): | |
continue | |
for y in get_all_adls_assets(jmespath.search("path", x), data_catalog_name, azuread_access_token, max_depth - 1): | |
yield y | |
# If max_depth has been reached, | |
# return the folders | |
else: | |
for x in jmespath.search("value", li): | |
if jmespath.search("!isLeaf", x): | |
yield x | |
def get_adls_asset_schema(assets_all: list, asset: str, purview_client: str): | |
""" | |
Returns the asset schema and classifications from Purview | |
""" | |
# Filter response for our asset of interest | |
assets_list = list(filter(lambda i: i['name'] == asset, assets_all)) | |
# Find the guid for the asset to retrieve the tabular_schema or attachedSchema (based on the asset type) | |
match_id = "" | |
for entry in assets_list: | |
# Retrieve the asset definition from the Atlas Client | |
response = purview_client.get_entity(entry['id']) | |
# API response is different based on the asset | |
if asset.split('.', 1)[-1] == "json": | |
filtered_response = jmespath.search("entities[?source=='DataScan'].[relationshipAttributes.attachedSchema[0].guid]", response) | |
else: | |
filtered_response = jmespath.search("entities[?source=='DataScan'].[relationshipAttributes.tabular_schema.guid]", response) | |
# Update match_id if source is DataScan | |
if filtered_response: | |
match_id = filtered_response[0][0] | |
# Retrieve the schema based on the guid match | |
response = purview_client.get_entity(match_id) | |
asset_schema = jmespath.search("[referredEntities.*.[attributes.name, classifications[0].[typeName][0]]]", response)[0] | |
return asset_schema | |
def deep_ls(path: str, max_depth=1): | |
""" | |
List all files and folders in specified path and | |
subfolders within maximum recursion depth. | |
""" | |
# List all files in path and apply sorting rules | |
li = mssparkutils.fs.ls(path) | |
# Return all files | |
for x in li: | |
if x.size != 0: | |
yield x | |
# If the max_depth has not been reached, start | |
# listing files and folders in subdirectories | |
if max_depth > 1: | |
for x in li: | |
if x.size != 0: | |
continue | |
for y in deep_ls(x.path, max_depth - 1): | |
yield y | |
# If max_depth has been reached, | |
# return the folders | |
else: | |
for x in li: | |
if x.size == 0: | |
yield x | |
def convertfiles2df(files): | |
""" | |
Converts FileInfo object into Pandas DataFrame to enable display | |
""" | |
# Disable Arrow-based transfers since the Pandas DataFrame is tiny | |
spark.conf.set("spark.sql.execution.arrow.enabled", "false") | |
schema = ['path','name','size'] | |
df = pd.DataFrame([[getattr(i,j) for j in schema] for i in files], columns = schema).sort_values('path') | |
return(df) | |
# Example Implementation | |
# ---------------------- | |
# Library Imports | |
import os | |
import requests | |
import json | |
import jmespath | |
import pandas as pd | |
from notebookutils import mssparkutils | |
from pprint import pprint | |
from pyapacheatlas.auth import ServicePrincipalAuthentication | |
from pyapacheatlas.core import PurviewClient, AtlasEntity, AtlasProcess, TypeCategory | |
from pyapacheatlas.core.typedef import * | |
from pyspark.sql import * | |
from pyspark.sql.functions import * | |
from pyspark.sql.types import * | |
# Authentication | |
# Service Principal with "Purview Data Source Administrator" permissions on Purview | |
tenant_id = "your-tenant-id" | |
client_id = "service-principal-client-id" | |
client_secret = "service-principal-client-secret" | |
resource_url = "https://purview.azure.net" | |
data_catalog_name = "your-purview-service-name" | |
# Retrieve authentication objects | |
azuread_access_token = azuread_auth(tenant_id, client_id, client_secret, resource_url) | |
purview_client = purview_auth(tenant_id, client_id, client_secret, data_catalog_name) | |
# Asset details | |
# Asset parameters | |
storage_account = "your-storage-account" | |
container = "your-container-name" | |
# The root level path we want to begin populating assets from | |
top_path = f"/azure_storage_account#{storage_account}.core.windows.net/azure_datalake_gen2_service#{storage_account}.dfs.core.windows.net/azure_datalake_gen2_filesystem#{container}" | |
# Retrieve full list of assets | |
assets_all = list(get_all_adls_assets(top_path, data_catalog_name, azuread_access_token, max_depth=20)) | |
# Azure storage access info | |
linked_service_name = 'adls-linked-service-name-in-synapse' | |
# Grab SAS token | |
adls_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name) | |
# Configure Spark to access from DFS endpoint | |
root = 'abfss://%s@%s.dfs.core.windows.net/' % (container, storage_account) | |
spark.conf.set('fs.azure.sas.%s.%s.dfs.core.windows.net' % (container, storage_account), adls_sas_token) | |
print('Remote adls root path: ' + root) | |
# Get ADLS files recursively | |
files = list(deep_ls(root, max_depth=20)) | |
files_df = convertfiles2df(files) # Note this is a Pandas DataFrame | |
# Generate asset-aligned names | |
files_df['asset'] = files_df['name'].str.replace(r'\d+', '{N}') | |
# Append schema row-wise from Purview | |
files_df['schema'] = files_df.apply(lambda row: get_adls_asset_schema(assets_all, row['asset'], purview_client), axis=1) | |
# Display Asset DataFrame | |
display(files_df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment