Created
April 1, 2022 09:11
-
-
Save fastlorenzo/8dffdcdee6a927c8ecf5989ddc9f2a3d to your computer and use it in GitHub Desktop.
azure_rm_keyvaultsecret_info.py with lastest version of azure python sdk
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# Copyright (c) 2019 Jose Angel Munoz, <[email protected]> | |
# | |
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) | |
from __future__ import absolute_import, division, print_function | |
from ansible_collections.azure.azcollection.plugins.module_utils.azure_rm_common import AzureRMModuleBase, AzureRMAuth | |
from ansible_collections.azure.azcollection.plugins.module_utils.cred_wrapper import CredentialWrapper | |
__metaclass__ = type | |
DOCUMENTATION = ''' | |
--- | |
module: azure_rm_keyvaultsecret_info | |
version_added: "0.1.2" | |
short_description: Get Azure Key Vault secret facts | |
description: | |
- Get facts of Azure Key Vault secret. | |
options: | |
vault_uri: | |
description: | |
- Vault uri where the secret stored in. | |
required: True | |
type: str | |
name: | |
description: | |
- Secret name. If not set, will list all secrets in vault_uri. | |
type: str | |
version: | |
description: | |
- Secret version. | |
- Set it to C(current) to show latest version of a secret. | |
- Set it to C(all) to list all versions of a secret. | |
- Set it to specific version to list specific version of a secret. eg. fd2682392a504455b79c90dd04a1bf46 | |
default: current | |
type: str | |
show_deleted_secret: | |
description: | |
- Set to I(show_delete_secret=true) to show deleted secrets. Set to I(show_deleted_secret=false) to show not deleted secrets. | |
type: bool | |
default: false | |
tags: | |
description: | |
- Limit results by providing a list of tags. Format tags as 'key' or 'key:value'. | |
type: dict | |
extends_documentation_fragment: | |
- azure.azcollection.azure | |
author: | |
- Jose Angel Munoz (@imjoseangel) | |
''' | |
EXAMPLES = ''' | |
- name: Get latest version of specific secret | |
azure_rm_keyvaultsecret_info: | |
vault_uri: "https://myVault.vault.azure.net" | |
name: mySecret | |
- name: List all versions of specific secret | |
azure_rm_keyvaultsecret_info: | |
vault_uri: "https://myVault.vault.azure.net" | |
name: mySecret | |
version: all | |
- name: List specific version of specific secret | |
azure_rm_keyvaultsecret_info: | |
vault_uri: "https://myVault.vault.azure.net" | |
name: mySecret | |
version: fd2682392a504455b79c90dd04a1bf46 | |
- name: List all secrets in specific key vault | |
azure_rm_keyvaultsecret_info: | |
vault_uri: "https://myVault.vault.azure.net" | |
- name: List deleted secrets in specific key vault | |
azure_rm_keyvaultsecret_info: | |
vault_uri: "https://myVault.vault.azure.net" | |
show_deleted_secret: True | |
''' | |
RETURN = ''' | |
secrets: | |
description: | |
- List of secrets in Azure Key Vault. | |
returned: always | |
type: complex | |
contains: | |
sid: | |
description: | |
- Secret identifier. | |
returned: always | |
type: str | |
sample: "https://myVault.vault.azure.net/flexsecret/secret1/fd2682392a504455b79c90dd04a1bf46" | |
version: | |
description: | |
- Secret version. | |
type: str | |
returned: always | |
sample: fd2682392a504455b79c90dd04a1bf46 | |
secret: | |
description: secret value. | |
type: str | |
returned: always | |
sample: mysecretvault | |
tags: | |
description: | |
- Tags of the secret. | |
returned: always | |
type: dict | |
sample: {"delete": "on-exit"} | |
content_type: | |
description: | |
- Content type (optional) | |
returned: always | |
type: str | |
sample: mysecrettype | |
attributes: | |
description: | |
- Secret attributes. | |
type: dict | |
contains: | |
created: | |
description: | |
- Creation datetime. | |
returned: always | |
type: str | |
sample: "2019-04-25T07:26:49+00:00" | |
not_before: | |
description: | |
- Not before datetime. | |
type: str | |
sample: "2019-04-25T07:26:49+00:00" | |
expires: | |
description: | |
- Expiration datetime. | |
type: str | |
sample: "2019-04-25T07:26:49+00:00" | |
updated: | |
description: | |
- Update datetime. | |
returned: always | |
type: str | |
sample: "2019-04-25T07:26:49+00:00" | |
enabled: | |
description: | |
- Indicate whether the secret is enabled. | |
returned: always | |
type: str | |
sample: true | |
recovery_level: | |
description: | |
- Reflects the deletion recovery level currently in effect for secrets in the current vault. | |
- If it contains 'Purgeable' the secret can be permanently deleted by a privileged user, | |
- Otherwise, only the system can purge the secret, at the end of the retention interval. | |
returned: always | |
type: str | |
sample: Recoverable+Purgeable | |
''' | |
try: | |
from azure.identity import DefaultAzureCredential | |
from azure.keyvault.secrets import SecretClient, KeyVaultSecret, SecretProperties, DeletedSecret, KeyVaultSecretIdentifier | |
from azure.core.exceptions import ResourceNotFoundError, HttpResponseError | |
from azure.common.credentials import ServicePrincipalCredentials | |
from msrestazure.azure_active_directory import MSIAuthentication, AADTokenCredentials | |
except ImportError: | |
# This is handled in azure_rm_common | |
pass | |
# def secretbundle_to_dict(bundle): | |
# return dict(tags=bundle.tags, | |
# attributes=dict( | |
# enabled=bundle.attributes.enabled, | |
# not_before=bundle.attributes.not_before, | |
# expires=bundle.attributes.expires, | |
# created=bundle.attributes.created, | |
# updated=bundle.attributes.updated, | |
# recovery_level=bundle.attributes.recovery_level), | |
# sid=bundle.id, | |
# version=KeyVaultSecretIdentifier(bundle.id).version, | |
# content_type=bundle.content_type, | |
# secret=bundle.value) | |
def deletedsecretbundle_to_dict(bundle: DeletedSecret): | |
secretbundle = secretbundle_to_dict(bundle) | |
secretbundle['recovery_id'] = bundle.recovery_id, | |
secretbundle['scheduled_purge_date'] = bundle.scheduled_purge_date, | |
secretbundle['deleted_date'] = bundle.deleted_date | |
return secretbundle | |
def secretitem_to_dict(secretitem: KeyVaultSecret): | |
return dict( | |
id=secretitem.id, | |
name=secretitem.name, | |
properties=secretproperties_to_dict(secretitem.properties), | |
value=secretitem.value, | |
) | |
def deletedsecretitem_to_dict(secretitem: DeletedSecret): | |
return dict( | |
id=secretitem.id, | |
deleted_date=secretitem.deleted_date, | |
name=secretitem.name, | |
properties=secretproperties_to_dict(secretitem.properties), | |
recovery_id=secretitem.recovery_id, | |
scheduled_purge_date=secretitem.scheduled_purge_date, | |
) | |
def secretproperties_to_dict(secretproperties: SecretProperties): | |
return dict( | |
content_type=secretproperties.content_type, | |
created_on=secretproperties.created_on, | |
enabled=secretproperties.enabled, | |
expires_on=secretproperties.expires_on, | |
id=secretproperties.id, | |
key_id=secretproperties.key_id, | |
managed=secretproperties.managed, | |
name=secretproperties.name, | |
not_before=secretproperties.not_before, | |
recoverable_days=secretproperties.recoverable_days, | |
recovery_level=secretproperties.recovery_level, | |
tags=secretproperties.tags, | |
updated_on=secretproperties.updated_on, | |
vault_url=secretproperties.vault_url, | |
version=secretproperties.version, | |
) | |
class AzureRMKeyVaultSecretInfo(AzureRMModuleBase): | |
def __init__(self): | |
self.module_arg_spec = dict(version=dict(type='str', | |
default='current'), | |
name=dict(type='str'), | |
vault_uri=dict(type='str', required=True), | |
show_deleted_secret=dict(type='bool', | |
default=False), | |
tags=dict(type='dict')) | |
self.vault_uri = None | |
self.name = None | |
self.version = None | |
self.show_deleted_secret = False | |
self.tags = None | |
self.results = dict(changed=False) | |
self._client: SecretClient = None | |
super(AzureRMKeyVaultSecretInfo, | |
self).__init__(derived_arg_spec=self.module_arg_spec, | |
supports_check_mode=True, | |
supports_tags=False) | |
def exec_module(self, **kwargs): | |
"""Main module execution method""" | |
for secret in list(self.module_arg_spec.keys()): | |
if hasattr(self, secret): | |
setattr(self, secret, kwargs[secret]) | |
self._client = self.get_keyvault_client() | |
if self.name: | |
if self.show_deleted_secret: | |
self.results['secrets'] = self.get_deleted_secret() | |
else: | |
if self.version == 'all': | |
self.results['secrets'] = self.get_secret_versions() | |
else: | |
self.results['secrets'] = self.get_secret() | |
else: | |
if self.show_deleted_secret: | |
self.results['secrets'] = self.list_deleted_secrets() | |
else: | |
self.results['secrets'] = ['No longer supported'] | |
return self.results | |
def get_keyvault_client(self): | |
credentials = DefaultAzureCredential() | |
client = SecretClient(vault_url=self.vault_uri, credential=credentials) | |
return client | |
# Don't use MSI credentials if the auth_source isn't set to MSI. The below will Always result in credentials when running on an Azure VM. | |
if self.module.params['auth_source'] == 'msi': | |
try: | |
self.log("Get KeyVaultClient from MSI") | |
resource = self.azure_auth._cloud_environment.suffixes.keyvault_dns.split( | |
'.', 1).pop() | |
credentials = MSIAuthentication( | |
resource="https://{0}".format(resource)) | |
return SecretClient(vault_url=self.vault_uri, credential=credentials) | |
except Exception: | |
self.log("Get KeyVaultClient from service principal") | |
elif self.module.params['auth_source'] == 'cli': | |
try: | |
self.log("Get KeyVaultClient from CLI") | |
auth_options = dict( | |
auth_source=self.module.params['auth_source'], | |
) | |
# azure_auth = AzureRMAuth(**self.module.params) | |
# token = azure_auth._get_azure_cli_credentials(subscription_id=self.module.params['subscription_id'], resource='https://vault.azure.net') | |
# client = KeyVaultClient(aad) | |
authcredential = DefaultAzureCredential() | |
# token = CredentialWrapper(credential=authcredential, resource_id='https://vault.azure.net') | |
# raise Exception(token) | |
# creds = AzureCliCredential() | |
# aad = AADTokenCredentials(token) | |
client = SecretClient( | |
vault_url=self.vault_uri, credential=authcredential) | |
#raise Exception('Error: ' + pformat(creds)) | |
#client = KeyVaultClient(client) | |
return client | |
except Exception as e: | |
self.log("Exception getting KeyVaultClient from CLI") | |
raise e | |
# else: | |
# try: | |
# self.log("Get KeyVaultClient from MSI") | |
# credentials = MSIAuthentication(resource='https://vault.azure.net') | |
# return KeyVaultClient(credentials) | |
# except Exception: | |
# self.log("Get KeyVaultClient from service principal") | |
# Create KeyVault Client using KeyVault auth class and auth_callback | |
def auth_callback(server, resource, scope): | |
if ('client_id' in self.credentials and self.credentials['client_id'] is None) or ('secret' in self.credentials and self.credentials['secret'] is None) and self.module.params['auth_source'] != 'cli': | |
self.fail( | |
'Please specify client_id, secret and tenant to access azure Key Vault.' | |
) | |
tenant = self.credentials.get('tenant') | |
if not tenant: | |
tenant = "common" | |
if self.module.params['auth_source'] == 'cli': | |
authcredential = DefaultAzureCredential() | |
token = CredentialWrapper( | |
credential=authcredential, resource_id='https://vault.azure.net') | |
# raise Exception(token) | |
return token['token_type'], token['access_token'] | |
else: | |
authcredential = ServicePrincipalCredentials( | |
client_id=self.credentials['client_id'], | |
secret=self.credentials['secret'], | |
tenant=tenant, | |
cloud_environment=self._cloud_environment, | |
resource="https://vault.azure.net") | |
token = authcredential.token | |
return token['token_type'], token['access_token'] | |
return KeyVaultClient(KeyVaultAuthentication(auth_callback)) | |
def get_secret(self): | |
''' | |
Gets the properties of the specified secret in key vault. | |
:return: deserialized secret state dictionary | |
''' | |
self.log("Get the secret {0}".format(self.name)) | |
results = [] | |
try: | |
if self.version == 'current': | |
response = self._client.get_secret( | |
name=self.name, | |
version='') | |
else: | |
response = self._client.get_secret( | |
name=self.name, | |
version=self.version) | |
if response: # and self.has_tags(response.tags, self.tags): | |
self.log("Response : {0}".format(response)) | |
results.append(secretitem_to_dict(response)) | |
except ResourceNotFoundError as e: | |
self.log("Did not find the key vault secret {0}: {1}".format( | |
self.name, str(e))) | |
except HttpResponseError as e: | |
self.log( | |
f'Error retrieving key vault secret {self.name}: {str(e)}') | |
raise e | |
return results | |
def get_secret_versions(self): | |
''' | |
Lists secrets versions. | |
:return: deserialized versions of secret, includes secret identifier, attributes and tags | |
''' | |
self.log("Get the secret versions {0}".format(self.name)) | |
results = [] | |
try: | |
response = self._client.list_properties_of_secret_versions( | |
name=self.name) | |
self.log("Response : {0}".format(response)) | |
if response: | |
for item in response: | |
results.append(secretproperties_to_dict(item)) | |
except ResourceNotFoundError as e: | |
self.log("Did not find the key vault secret versions {0}: {1}".format( | |
self.name, str(e))) | |
except HttpResponseError as e: | |
self.log( | |
f'Error retrieving key vault secret versions {self.name}: {str(e)}') | |
raise e | |
return results | |
def get_deleted_secret(self): | |
''' | |
Gets the properties of the specified deleted secret in key vault. | |
:return: deserialized secret state dictionary | |
''' | |
self.log("Get the secret {0}".format(self.name)) | |
results = [] | |
try: | |
response = self._client.get_deleted_secret(name=self.name) | |
if response: # and self.has_tags(response.tags, self.tags): | |
self.log("Response : {0}".format(response)) | |
results.append(deletedsecretitem_to_dict(response)) | |
except ResourceNotFoundError as e: | |
self.log("Did not find the key vault deleted secret {0}: {1}".format( | |
self.name, str(e))) | |
except HttpResponseError as e: | |
self.log( | |
f'Error retrieving key vault deleted secret {self.name}: {str(e)}') | |
raise e | |
return results | |
def list_deleted_secrets(self): | |
''' | |
Lists deleted secrets in specific key vault. | |
:return: deserialized secrets, includes secret identifier, attributes and tags. | |
''' | |
self.log("Get the key vaults in current subscription") | |
results = [] | |
try: | |
response = self._client.list_deleted_secrets() | |
self.log("Response : {0}".format(response)) | |
if response: | |
for item in response: | |
# if self.has_tags(item.tags, self.tags): | |
results.append(deletedsecretitem_to_dict(item)) | |
except ResourceNotFoundError as e: | |
self.log("Did not find key vault deleted secrets {0}: {1}".format( | |
self.name, str(e))) | |
except HttpResponseError as e: | |
self.log( | |
f'Error retrieving key vault deleted secrets {self.name}: {str(e)}') | |
raise e | |
return results | |
def main(): | |
"""Main execution""" | |
AzureRMKeyVaultSecretInfo() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Source: https://gist.github.com/lmazuel/cc683d82ea1d7b40208de7c9fc8de59d | |
# Info: https://docs.microsoft.com/en-us/azure/developer/python/azure-sdk-authenticate-hosted-applications#credential-object-has-no-attribute-signed_session | |
# Wrap credentials from azure-identity to be compatible with SDK that needs msrestazure or azure.common.credentials | |
# Need msrest >= 0.6.0 | |
# See also https://pypi.org/project/azure-identity/ | |
from msrest.authentication import BasicTokenAuthentication | |
from azure.core.pipeline.policies import BearerTokenCredentialPolicy | |
from azure.core.pipeline import PipelineRequest, PipelineContext | |
from azure.core.pipeline.transport import HttpRequest | |
from azure.identity import DefaultAzureCredential | |
class CredentialWrapper(BasicTokenAuthentication): | |
def __init__(self, credential=None, resource_id="https://management.azure.com/.default", **kwargs): | |
"""Wrap any azure-identity credential to work with SDK that needs azure.common.credentials/msrestazure. | |
Default resource is ARM (syntax of endpoint v2) | |
:param credential: Any azure-identity credential (DefaultAzureCredential by default) | |
:param str resource_id: The scope to use to get the token (default ARM) | |
""" | |
super(CredentialWrapper, self).__init__(None) | |
if credential is None: | |
credential = DefaultAzureCredential() | |
self._policy = BearerTokenCredentialPolicy(credential, resource_id, **kwargs) | |
def _make_request(self): | |
return PipelineRequest( | |
HttpRequest( | |
"CredentialWrapper", | |
"https://fakeurl" | |
), | |
PipelineContext(None) | |
) | |
def set_token(self): | |
"""Ask the azure-core BearerTokenCredentialPolicy policy to get a token. | |
Using the policy gives us for free the caching system of azure-core. | |
We could make this code simpler by using private method, but by definition | |
I can't assure they will be there forever, so mocking a fake call to the policy | |
to extract the token, using 100% public API.""" | |
request = self._make_request() | |
self._policy.on_request(request) | |
# Read Authorization, and get the second part after Bearer | |
token = request.http_request.headers["Authorization"].split(" ", 1)[1] | |
self.token = {"access_token": token} | |
def signed_session(self, session=None): | |
self.set_token() | |
return super(CredentialWrapper, self).signed_session(session) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
packaging | |
requests[security] | |
xmltodict | |
azure-cli-core==2.34.0 | |
azure-common==1.1.11 | |
azure-identity==1.8.0 | |
azure-mgmt-apimanagement==0.2.0 | |
azure-mgmt-authorization==0.51.1 | |
azure-mgmt-batch==5.0.1 | |
azure-mgmt-cdn==3.0.0 | |
azure-mgmt-compute==23.1.0 | |
azure-mgmt-containerinstance==1.4.0 | |
azure-mgmt-containerregistry==2.0.0 | |
azure-mgmt-containerservice==9.1.0 | |
azure-mgmt-datalake-store==0.5.0 | |
azure-mgmt-dns==2.1.0 | |
azure-mgmt-keyvault==1.1.0 | |
azure-mgmt-marketplaceordering==0.1.0 | |
azure-mgmt-monitor==3.0.0 | |
azure-mgmt-managedservices==1.0.0 | |
azure-mgmt-managementgroups==0.2.0 | |
azure-mgmt-network==19.1.0 | |
azure-mgmt-nspkg==2.0.0 | |
azure-mgmt-privatedns==0.1.0 | |
azure-mgmt-redis==5.0.0 | |
azure-mgmt-resource==10.2.0 | |
azure-mgmt-rdbms==1.9.0 | |
azure-mgmt-search==3.0.0 | |
azure-mgmt-servicebus==0.5.3 | |
azure-mgmt-sql==0.10.0 | |
azure-mgmt-storage==19.0.0 | |
azure-mgmt-trafficmanager==0.50.0 | |
azure-mgmt-web==0.41.0 | |
azure-nspkg==2.0.0 | |
azure-storage==0.35.1 | |
msrest==0.6.21 | |
msrestazure==0.6.4 | |
azure-keyvault==4.2.0 | |
azure-graphrbac==0.61.1 | |
azure-mgmt-cosmosdb==0.5.2 | |
azure-mgmt-hdinsight==0.1.0 | |
azure-mgmt-devtestlabs==3.0.0 | |
azure-mgmt-loganalytics==1.0.0 | |
azure-mgmt-automation==0.1.1 | |
azure-mgmt-iothub==0.7.0 | |
azure-mgmt-recoveryservices==0.4.0 | |
azure-mgmt-recoveryservicesbackup==0.6.0 | |
azure-mgmt-notificationhubs==2.0.0 | |
azure-mgmt-eventhub==2.0.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment