Last active
December 21, 2021 18:58
-
-
Save salrashid123/e44944de52ede6dff86dc40a484a06d2 to your computer and use it in GitHub Desktop.
google-auth python. Impersonate and domain-delegate using impersonated_credentials
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# snippet uses ADC credentials to impersonate [email protected] | |
# then use that server's credentials to create a token for user2 using domain delegation | |
# after that, the gcs and pubsub calls are done as if its user2 | |
import google.auth | |
import time | |
from google.auth import credentials | |
from google.cloud import iam_credentials_v1 | |
from google.auth import impersonated_credentials | |
import google.auth.iam | |
import base64 | |
# see https://github.com/googleapis/google-auth-library-python/issues/931 | |
from datetime import datetime, timedelta | |
from google.auth import _helpers | |
from google.auth import credentials | |
from google.auth import exceptions | |
import requests | |
class StaticCredentials(credentials.Credentials): | |
def __init__( | |
self, | |
token, | |
expires_in, | |
token_type, | |
): | |
super(credentials.Credentials, self).__init__() | |
self.token = token | |
if token == None: | |
raise exceptions.GoogleAuthError( | |
"Provided token cannot be null" | |
) | |
self.token_type = token_type | |
self.expiry = datetime.now() + timedelta(seconds=int(expires_in)) | |
self._quota_project_id = None | |
@_helpers.copy_docstring(credentials.Credentials) | |
def refresh(self, request): | |
return | |
@property | |
def expired(self): | |
return _helpers.utcnow() >= self.expiry | |
@property | |
def quota_project_id(self): | |
"""Project to use for quota and billing purposes.""" | |
return self._quota_project_id | |
# https://google-auth.readthedocs.io/en/master/reference/google.oauth2.service_account.html#domain-wide-delegation | |
project='project' | |
sa_1 = '[email protected]' | |
## get the source credentials first (this is user_1) | |
source_credentials, project_id = google.auth.default() | |
# use that to get sa_1 credentials | |
target_scopes = ['https://www.googleapis.com/auth/cloud-platform'] | |
sa1_credentials = impersonated_credentials.Credentials( | |
source_credentials = source_credentials, | |
target_principal=sa_1, | |
target_scopes = target_scopes, | |
delegates=[], | |
lifetime=500) | |
### use sa1_credentials to get an token for user2 | |
sub = "[email protected]" | |
target_scopes_for_dwd = "https://www.googleapis.com/auth/cloud-platform https://www.googleapis.com/auth/admin.directory.user.readonly" | |
now = int(time.time()) | |
exptime = now + 3600 | |
claim =('{"iss":"%s",' | |
'"scope":"%s",' | |
'"aud":"https://accounts.google.com/o/oauth2/token",' | |
'"sub":"%s",' | |
'"exp":%s,' | |
'"iat":%s}') %(sa_1,target_scopes_for_dwd,sub,exptime,now) | |
# use the impersonated_credentials sign the JWT (since it has a signer) | |
# https://google-auth.readthedocs.io/en/master/reference/google.auth.impersonated_credentials.html#google.auth.impersonated_credentials.Credentials.signer | |
## is there anyway we can optionally return a tuple? i.,e the (signature, key_id) | |
## eg https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/signBlob#response-body | |
# { | |
# "keyId": string, | |
# "signedBlob": string | |
# } | |
#header = ('{"alg": "RS256", "kid": "%s", "typ": "JWT"}') %("24d9257dc9fe8959728b57655339b753d0f20f09") | |
header = ('{"alg": "RS256", "typ": "JWT"}') | |
header_claims = '{}.{}'.format(base64.urlsafe_b64encode(header.encode()).decode().rstrip('='),base64.urlsafe_b64encode(claim.encode()).decode().rstrip('=')) | |
signed_bytes = sa1_credentials.sign_bytes(header_claims.encode()) | |
assertion = header_claims + '.' + base64.urlsafe_b64encode(signed_bytes).decode().rstrip('=') | |
print('Signed JWT '+ assertion) | |
# now exchange the signed_jwt for an access_token representing the subject sub | |
url = 'https://accounts.google.com/o/oauth2/token' | |
data = {'grant_type' : 'assertion', | |
'assertion_type' : 'http://oauth.net/grant_type/jwt/1.0/bearer', | |
'assertion' : assertion } | |
headers = {"Content-type": "application/x-www-form-urlencoded"} | |
resp = requests.post(url, data) | |
## We finally have user2's token.... | |
access_token=resp.json()['access_token'] | |
expires_in=resp.json()['expires_in'] | |
token_type=resp.json()['token_type'] | |
## .use that as a static token here | |
## https://github.com/googleapis/google-auth-library-python/issues/931 | |
sc = StaticCredentials(token=access_token,expires_in=expires_in,token_type=token_type) | |
from google.cloud import storage | |
client = storage.Client(project=project, credentials=sc) | |
for b in client.list_buckets(): | |
print(b.name) | |
from google.cloud import pubsub_v1 | |
from google.auth.transport.requests import AuthorizedSession | |
project_path = f"projects/{project}" | |
authed_session = AuthorizedSession(sc) | |
response = authed_session.request('GET', 'https://pubsub.googleapis.com/v1/{}/topics'.format(project_path)) | |
print(response.json()) | |
publisher = pubsub_v1.PublisherClient(credentials=sc) | |
for topic in publisher.list_topics(request={"project": project_path}): | |
print(topic.name) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment