Skip to content

Instantly share code, notes, and snippets.

@fastlorenzo
Created April 1, 2022 09:11
Show Gist options
  • Save fastlorenzo/8dffdcdee6a927c8ecf5989ddc9f2a3d to your computer and use it in GitHub Desktop.
Save fastlorenzo/8dffdcdee6a927c8ecf5989ddc9f2a3d to your computer and use it in GitHub Desktop.
azure_rm_keyvaultsecret_info.py with lastest version of azure python sdk
#!/usr/bin/python
#
# Copyright (c) 2019 Jose Angel Munoz, <[email protected]>
#
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
from ansible_collections.azure.azcollection.plugins.module_utils.azure_rm_common import AzureRMModuleBase, AzureRMAuth
from ansible_collections.azure.azcollection.plugins.module_utils.cred_wrapper import CredentialWrapper
__metaclass__ = type
DOCUMENTATION = '''
---
module: azure_rm_keyvaultsecret_info
version_added: "0.1.2"
short_description: Get Azure Key Vault secret facts
description:
- Get facts of Azure Key Vault secret.
options:
vault_uri:
description:
- Vault uri where the secret stored in.
required: True
type: str
name:
description:
- Secret name. If not set, will list all secrets in vault_uri.
type: str
version:
description:
- Secret version.
- Set it to C(current) to show latest version of a secret.
- Set it to C(all) to list all versions of a secret.
- Set it to specific version to list specific version of a secret. eg. fd2682392a504455b79c90dd04a1bf46
default: current
type: str
show_deleted_secret:
description:
- Set to I(show_delete_secret=true) to show deleted secrets. Set to I(show_deleted_secret=false) to show not deleted secrets.
type: bool
default: false
tags:
description:
- Limit results by providing a list of tags. Format tags as 'key' or 'key:value'.
type: dict
extends_documentation_fragment:
- azure.azcollection.azure
author:
- Jose Angel Munoz (@imjoseangel)
'''
EXAMPLES = '''
- name: Get latest version of specific secret
azure_rm_keyvaultsecret_info:
vault_uri: "https://myVault.vault.azure.net"
name: mySecret
- name: List all versions of specific secret
azure_rm_keyvaultsecret_info:
vault_uri: "https://myVault.vault.azure.net"
name: mySecret
version: all
- name: List specific version of specific secret
azure_rm_keyvaultsecret_info:
vault_uri: "https://myVault.vault.azure.net"
name: mySecret
version: fd2682392a504455b79c90dd04a1bf46
- name: List all secrets in specific key vault
azure_rm_keyvaultsecret_info:
vault_uri: "https://myVault.vault.azure.net"
- name: List deleted secrets in specific key vault
azure_rm_keyvaultsecret_info:
vault_uri: "https://myVault.vault.azure.net"
show_deleted_secret: True
'''
RETURN = '''
secrets:
description:
- List of secrets in Azure Key Vault.
returned: always
type: complex
contains:
sid:
description:
- Secret identifier.
returned: always
type: str
sample: "https://myVault.vault.azure.net/flexsecret/secret1/fd2682392a504455b79c90dd04a1bf46"
version:
description:
- Secret version.
type: str
returned: always
sample: fd2682392a504455b79c90dd04a1bf46
secret:
description: secret value.
type: str
returned: always
sample: mysecretvault
tags:
description:
- Tags of the secret.
returned: always
type: dict
sample: {"delete": "on-exit"}
content_type:
description:
- Content type (optional)
returned: always
type: str
sample: mysecrettype
attributes:
description:
- Secret attributes.
type: dict
contains:
created:
description:
- Creation datetime.
returned: always
type: str
sample: "2019-04-25T07:26:49+00:00"
not_before:
description:
- Not before datetime.
type: str
sample: "2019-04-25T07:26:49+00:00"
expires:
description:
- Expiration datetime.
type: str
sample: "2019-04-25T07:26:49+00:00"
updated:
description:
- Update datetime.
returned: always
type: str
sample: "2019-04-25T07:26:49+00:00"
enabled:
description:
- Indicate whether the secret is enabled.
returned: always
type: str
sample: true
recovery_level:
description:
- Reflects the deletion recovery level currently in effect for secrets in the current vault.
- If it contains 'Purgeable' the secret can be permanently deleted by a privileged user,
- Otherwise, only the system can purge the secret, at the end of the retention interval.
returned: always
type: str
sample: Recoverable+Purgeable
'''
try:
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient, KeyVaultSecret, SecretProperties, DeletedSecret, KeyVaultSecretIdentifier
from azure.core.exceptions import ResourceNotFoundError, HttpResponseError
from azure.common.credentials import ServicePrincipalCredentials
from msrestazure.azure_active_directory import MSIAuthentication, AADTokenCredentials
except ImportError:
# This is handled in azure_rm_common
pass
# def secretbundle_to_dict(bundle):
# return dict(tags=bundle.tags,
# attributes=dict(
# enabled=bundle.attributes.enabled,
# not_before=bundle.attributes.not_before,
# expires=bundle.attributes.expires,
# created=bundle.attributes.created,
# updated=bundle.attributes.updated,
# recovery_level=bundle.attributes.recovery_level),
# sid=bundle.id,
# version=KeyVaultSecretIdentifier(bundle.id).version,
# content_type=bundle.content_type,
# secret=bundle.value)
def deletedsecretbundle_to_dict(bundle: DeletedSecret):
secretbundle = secretbundle_to_dict(bundle)
secretbundle['recovery_id'] = bundle.recovery_id,
secretbundle['scheduled_purge_date'] = bundle.scheduled_purge_date,
secretbundle['deleted_date'] = bundle.deleted_date
return secretbundle
def secretitem_to_dict(secretitem: KeyVaultSecret):
return dict(
id=secretitem.id,
name=secretitem.name,
properties=secretproperties_to_dict(secretitem.properties),
value=secretitem.value,
)
def deletedsecretitem_to_dict(secretitem: DeletedSecret):
return dict(
id=secretitem.id,
deleted_date=secretitem.deleted_date,
name=secretitem.name,
properties=secretproperties_to_dict(secretitem.properties),
recovery_id=secretitem.recovery_id,
scheduled_purge_date=secretitem.scheduled_purge_date,
)
def secretproperties_to_dict(secretproperties: SecretProperties):
return dict(
content_type=secretproperties.content_type,
created_on=secretproperties.created_on,
enabled=secretproperties.enabled,
expires_on=secretproperties.expires_on,
id=secretproperties.id,
key_id=secretproperties.key_id,
managed=secretproperties.managed,
name=secretproperties.name,
not_before=secretproperties.not_before,
recoverable_days=secretproperties.recoverable_days,
recovery_level=secretproperties.recovery_level,
tags=secretproperties.tags,
updated_on=secretproperties.updated_on,
vault_url=secretproperties.vault_url,
version=secretproperties.version,
)
class AzureRMKeyVaultSecretInfo(AzureRMModuleBase):
def __init__(self):
self.module_arg_spec = dict(version=dict(type='str',
default='current'),
name=dict(type='str'),
vault_uri=dict(type='str', required=True),
show_deleted_secret=dict(type='bool',
default=False),
tags=dict(type='dict'))
self.vault_uri = None
self.name = None
self.version = None
self.show_deleted_secret = False
self.tags = None
self.results = dict(changed=False)
self._client: SecretClient = None
super(AzureRMKeyVaultSecretInfo,
self).__init__(derived_arg_spec=self.module_arg_spec,
supports_check_mode=True,
supports_tags=False)
def exec_module(self, **kwargs):
"""Main module execution method"""
for secret in list(self.module_arg_spec.keys()):
if hasattr(self, secret):
setattr(self, secret, kwargs[secret])
self._client = self.get_keyvault_client()
if self.name:
if self.show_deleted_secret:
self.results['secrets'] = self.get_deleted_secret()
else:
if self.version == 'all':
self.results['secrets'] = self.get_secret_versions()
else:
self.results['secrets'] = self.get_secret()
else:
if self.show_deleted_secret:
self.results['secrets'] = self.list_deleted_secrets()
else:
self.results['secrets'] = ['No longer supported']
return self.results
def get_keyvault_client(self):
credentials = DefaultAzureCredential()
client = SecretClient(vault_url=self.vault_uri, credential=credentials)
return client
# Don't use MSI credentials if the auth_source isn't set to MSI. The below will Always result in credentials when running on an Azure VM.
if self.module.params['auth_source'] == 'msi':
try:
self.log("Get KeyVaultClient from MSI")
resource = self.azure_auth._cloud_environment.suffixes.keyvault_dns.split(
'.', 1).pop()
credentials = MSIAuthentication(
resource="https://{0}".format(resource))
return SecretClient(vault_url=self.vault_uri, credential=credentials)
except Exception:
self.log("Get KeyVaultClient from service principal")
elif self.module.params['auth_source'] == 'cli':
try:
self.log("Get KeyVaultClient from CLI")
auth_options = dict(
auth_source=self.module.params['auth_source'],
)
# azure_auth = AzureRMAuth(**self.module.params)
# token = azure_auth._get_azure_cli_credentials(subscription_id=self.module.params['subscription_id'], resource='https://vault.azure.net')
# client = KeyVaultClient(aad)
authcredential = DefaultAzureCredential()
# token = CredentialWrapper(credential=authcredential, resource_id='https://vault.azure.net')
# raise Exception(token)
# creds = AzureCliCredential()
# aad = AADTokenCredentials(token)
client = SecretClient(
vault_url=self.vault_uri, credential=authcredential)
#raise Exception('Error: ' + pformat(creds))
#client = KeyVaultClient(client)
return client
except Exception as e:
self.log("Exception getting KeyVaultClient from CLI")
raise e
# else:
# try:
# self.log("Get KeyVaultClient from MSI")
# credentials = MSIAuthentication(resource='https://vault.azure.net')
# return KeyVaultClient(credentials)
# except Exception:
# self.log("Get KeyVaultClient from service principal")
# Create KeyVault Client using KeyVault auth class and auth_callback
def auth_callback(server, resource, scope):
if ('client_id' in self.credentials and self.credentials['client_id'] is None) or ('secret' in self.credentials and self.credentials['secret'] is None) and self.module.params['auth_source'] != 'cli':
self.fail(
'Please specify client_id, secret and tenant to access azure Key Vault.'
)
tenant = self.credentials.get('tenant')
if not tenant:
tenant = "common"
if self.module.params['auth_source'] == 'cli':
authcredential = DefaultAzureCredential()
token = CredentialWrapper(
credential=authcredential, resource_id='https://vault.azure.net')
# raise Exception(token)
return token['token_type'], token['access_token']
else:
authcredential = ServicePrincipalCredentials(
client_id=self.credentials['client_id'],
secret=self.credentials['secret'],
tenant=tenant,
cloud_environment=self._cloud_environment,
resource="https://vault.azure.net")
token = authcredential.token
return token['token_type'], token['access_token']
return KeyVaultClient(KeyVaultAuthentication(auth_callback))
def get_secret(self):
'''
Gets the properties of the specified secret in key vault.
:return: deserialized secret state dictionary
'''
self.log("Get the secret {0}".format(self.name))
results = []
try:
if self.version == 'current':
response = self._client.get_secret(
name=self.name,
version='')
else:
response = self._client.get_secret(
name=self.name,
version=self.version)
if response: # and self.has_tags(response.tags, self.tags):
self.log("Response : {0}".format(response))
results.append(secretitem_to_dict(response))
except ResourceNotFoundError as e:
self.log("Did not find the key vault secret {0}: {1}".format(
self.name, str(e)))
except HttpResponseError as e:
self.log(
f'Error retrieving key vault secret {self.name}: {str(e)}')
raise e
return results
def get_secret_versions(self):
'''
Lists secrets versions.
:return: deserialized versions of secret, includes secret identifier, attributes and tags
'''
self.log("Get the secret versions {0}".format(self.name))
results = []
try:
response = self._client.list_properties_of_secret_versions(
name=self.name)
self.log("Response : {0}".format(response))
if response:
for item in response:
results.append(secretproperties_to_dict(item))
except ResourceNotFoundError as e:
self.log("Did not find the key vault secret versions {0}: {1}".format(
self.name, str(e)))
except HttpResponseError as e:
self.log(
f'Error retrieving key vault secret versions {self.name}: {str(e)}')
raise e
return results
def get_deleted_secret(self):
'''
Gets the properties of the specified deleted secret in key vault.
:return: deserialized secret state dictionary
'''
self.log("Get the secret {0}".format(self.name))
results = []
try:
response = self._client.get_deleted_secret(name=self.name)
if response: # and self.has_tags(response.tags, self.tags):
self.log("Response : {0}".format(response))
results.append(deletedsecretitem_to_dict(response))
except ResourceNotFoundError as e:
self.log("Did not find the key vault deleted secret {0}: {1}".format(
self.name, str(e)))
except HttpResponseError as e:
self.log(
f'Error retrieving key vault deleted secret {self.name}: {str(e)}')
raise e
return results
def list_deleted_secrets(self):
'''
Lists deleted secrets in specific key vault.
:return: deserialized secrets, includes secret identifier, attributes and tags.
'''
self.log("Get the key vaults in current subscription")
results = []
try:
response = self._client.list_deleted_secrets()
self.log("Response : {0}".format(response))
if response:
for item in response:
# if self.has_tags(item.tags, self.tags):
results.append(deletedsecretitem_to_dict(item))
except ResourceNotFoundError as e:
self.log("Did not find key vault deleted secrets {0}: {1}".format(
self.name, str(e)))
except HttpResponseError as e:
self.log(
f'Error retrieving key vault deleted secrets {self.name}: {str(e)}')
raise e
return results
def main():
"""Main execution"""
AzureRMKeyVaultSecretInfo()
if __name__ == '__main__':
main()
# Source: https://gist.github.com/lmazuel/cc683d82ea1d7b40208de7c9fc8de59d
# Info: https://docs.microsoft.com/en-us/azure/developer/python/azure-sdk-authenticate-hosted-applications#credential-object-has-no-attribute-signed_session
# Wrap credentials from azure-identity to be compatible with SDK that needs msrestazure or azure.common.credentials
# Need msrest >= 0.6.0
# See also https://pypi.org/project/azure-identity/
from msrest.authentication import BasicTokenAuthentication
from azure.core.pipeline.policies import BearerTokenCredentialPolicy
from azure.core.pipeline import PipelineRequest, PipelineContext
from azure.core.pipeline.transport import HttpRequest
from azure.identity import DefaultAzureCredential
class CredentialWrapper(BasicTokenAuthentication):
def __init__(self, credential=None, resource_id="https://management.azure.com/.default", **kwargs):
"""Wrap any azure-identity credential to work with SDK that needs azure.common.credentials/msrestazure.
Default resource is ARM (syntax of endpoint v2)
:param credential: Any azure-identity credential (DefaultAzureCredential by default)
:param str resource_id: The scope to use to get the token (default ARM)
"""
super(CredentialWrapper, self).__init__(None)
if credential is None:
credential = DefaultAzureCredential()
self._policy = BearerTokenCredentialPolicy(credential, resource_id, **kwargs)
def _make_request(self):
return PipelineRequest(
HttpRequest(
"CredentialWrapper",
"https://fakeurl"
),
PipelineContext(None)
)
def set_token(self):
"""Ask the azure-core BearerTokenCredentialPolicy policy to get a token.
Using the policy gives us for free the caching system of azure-core.
We could make this code simpler by using private method, but by definition
I can't assure they will be there forever, so mocking a fake call to the policy
to extract the token, using 100% public API."""
request = self._make_request()
self._policy.on_request(request)
# Read Authorization, and get the second part after Bearer
token = request.http_request.headers["Authorization"].split(" ", 1)[1]
self.token = {"access_token": token}
def signed_session(self, session=None):
self.set_token()
return super(CredentialWrapper, self).signed_session(session)
packaging
requests[security]
xmltodict
azure-cli-core==2.34.0
azure-common==1.1.11
azure-identity==1.8.0
azure-mgmt-apimanagement==0.2.0
azure-mgmt-authorization==0.51.1
azure-mgmt-batch==5.0.1
azure-mgmt-cdn==3.0.0
azure-mgmt-compute==23.1.0
azure-mgmt-containerinstance==1.4.0
azure-mgmt-containerregistry==2.0.0
azure-mgmt-containerservice==9.1.0
azure-mgmt-datalake-store==0.5.0
azure-mgmt-dns==2.1.0
azure-mgmt-keyvault==1.1.0
azure-mgmt-marketplaceordering==0.1.0
azure-mgmt-monitor==3.0.0
azure-mgmt-managedservices==1.0.0
azure-mgmt-managementgroups==0.2.0
azure-mgmt-network==19.1.0
azure-mgmt-nspkg==2.0.0
azure-mgmt-privatedns==0.1.0
azure-mgmt-redis==5.0.0
azure-mgmt-resource==10.2.0
azure-mgmt-rdbms==1.9.0
azure-mgmt-search==3.0.0
azure-mgmt-servicebus==0.5.3
azure-mgmt-sql==0.10.0
azure-mgmt-storage==19.0.0
azure-mgmt-trafficmanager==0.50.0
azure-mgmt-web==0.41.0
azure-nspkg==2.0.0
azure-storage==0.35.1
msrest==0.6.21
msrestazure==0.6.4
azure-keyvault==4.2.0
azure-graphrbac==0.61.1
azure-mgmt-cosmosdb==0.5.2
azure-mgmt-hdinsight==0.1.0
azure-mgmt-devtestlabs==3.0.0
azure-mgmt-loganalytics==1.0.0
azure-mgmt-automation==0.1.1
azure-mgmt-iothub==0.7.0
azure-mgmt-recoveryservices==0.4.0
azure-mgmt-recoveryservicesbackup==0.6.0
azure-mgmt-notificationhubs==2.0.0
azure-mgmt-eventhub==2.0.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment