Skip to content

Instantly share code, notes, and snippets.

@thanakijwanavit
Last active November 17, 2020 10:29
Show Gist options
  • Save thanakijwanavit/5918c28703c82b607596d1ae707daebc to your computer and use it in GitHub Desktop.
Save thanakijwanavit/5918c28703c82b607596d1ae707daebc to your computer and use it in GitHub Desktop.
class Cognito:
'''sdk to interact with cognito as a client'''
def __init__(
self,
identityPoolId,
userPoolId,
accountId,
clientId,
clientSecret,
region = 'ap-southeast-1'
):
self.identityPoolId = identityPoolId
self.userPoolId = userPoolId
self.accountId = accountId
self.client = boto3.client('cognito-idp', region_name=region)
self.identityClient = boto3.client('cognito-identity', region_name=region)
self.clientId = clientId
self.clientSecret = clientSecret
self.region = region
@add_method(Cognito)
def credenFromIdToken(self, logins):
'''get aws credentials from cognito token'''
## get identityId
identityClient = boto3.client('cognito-identity', region_name=self.region)
identityResponse = identityClient.get_id(
AccountId=self.accountId,
IdentityPoolId=self.identityPoolId,
Logins = logins
)
identityId = identityResponse['IdentityId']
## get aws credentials
credentials_response = identityClient.get_credentials_for_identity(
IdentityId=identityId,
Logins = logins
)
return credentials_response['Credentials']
@add_method(Cognito)
def login(self, user:str , pw:str, *args, **kwargs ):
'''login to pool using cognito'''
self.srp = AWSSRP(
username = user,
password= pw,
pool_id = self.userPoolId,
client_id = self.clientId,
client_secret= self.clientSecret ,
client = self.client
)
# login
tokens = self.srp.authenticate_user()
print(f'tokens are length')
printDict(tokens)
##########################################################
# extract accessToken etc
accessToken = tokens['AuthenticationResult']['AccessToken']
refreshToken = tokens['AuthenticationResult']['RefreshToken']
idToken = tokens['AuthenticationResult']['IdToken']
####### Getting the IAM credentials############################
logins = {f'cognito-idp.ap-southeast-1.amazonaws.com/{self.userPoolId}': idToken}
printDict(logins)
creden = self.credenFromIdToken(logins)
creden.update(tokens['AuthenticationResult'])
creden['Expiration'] = creden['Expiration'].timestamp()
return creden
@add_method(Cognito)
def refreshToken(self, user,refreshToken):
try:
client = boto3.client('cognito-idp')
return client.initiate_auth(
ClientId=self.clientId,
AuthFlow='REFRESH_TOKEN_AUTH',
AuthParameters={
'REFRESH_TOKEN': refreshToken,
'SECRET_HASH': self.getSecretHash(user)
}
)
except botocore.exceptions.ClientError as e:
return e.response
@add_method(Cognito)
def getSecretHash(self, username):
# A keyed-hash message authentication code (HMAC) calculated using
# the secret key of a user pool client and username plus the client
# ID in the message.
message = username + self.clientId
dig = hmac.new(self.clientSecret.encode(), msg=message.encode('UTF-8'),
digestmod=hashlib.sha256).digest()
return base64.b64encode(dig).decode()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment