Last active
November 17, 2020 10:29
-
-
Save thanakijwanavit/5918c28703c82b607596d1ae707daebc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Cognito: | |
'''sdk to interact with cognito as a client''' | |
def __init__( | |
self, | |
identityPoolId, | |
userPoolId, | |
accountId, | |
clientId, | |
clientSecret, | |
region = 'ap-southeast-1' | |
): | |
self.identityPoolId = identityPoolId | |
self.userPoolId = userPoolId | |
self.accountId = accountId | |
self.client = boto3.client('cognito-idp', region_name=region) | |
self.identityClient = boto3.client('cognito-identity', region_name=region) | |
self.clientId = clientId | |
self.clientSecret = clientSecret | |
self.region = region | |
@add_method(Cognito) | |
def credenFromIdToken(self, logins): | |
'''get aws credentials from cognito token''' | |
## get identityId | |
identityClient = boto3.client('cognito-identity', region_name=self.region) | |
identityResponse = identityClient.get_id( | |
AccountId=self.accountId, | |
IdentityPoolId=self.identityPoolId, | |
Logins = logins | |
) | |
identityId = identityResponse['IdentityId'] | |
## get aws credentials | |
credentials_response = identityClient.get_credentials_for_identity( | |
IdentityId=identityId, | |
Logins = logins | |
) | |
return credentials_response['Credentials'] | |
@add_method(Cognito) | |
def login(self, user:str , pw:str, *args, **kwargs ): | |
'''login to pool using cognito''' | |
self.srp = AWSSRP( | |
username = user, | |
password= pw, | |
pool_id = self.userPoolId, | |
client_id = self.clientId, | |
client_secret= self.clientSecret , | |
client = self.client | |
) | |
# login | |
tokens = self.srp.authenticate_user() | |
print(f'tokens are length') | |
printDict(tokens) | |
########################################################## | |
# extract accessToken etc | |
accessToken = tokens['AuthenticationResult']['AccessToken'] | |
refreshToken = tokens['AuthenticationResult']['RefreshToken'] | |
idToken = tokens['AuthenticationResult']['IdToken'] | |
####### Getting the IAM credentials############################ | |
logins = {f'cognito-idp.ap-southeast-1.amazonaws.com/{self.userPoolId}': idToken} | |
printDict(logins) | |
creden = self.credenFromIdToken(logins) | |
creden.update(tokens['AuthenticationResult']) | |
creden['Expiration'] = creden['Expiration'].timestamp() | |
return creden | |
@add_method(Cognito) | |
def refreshToken(self, user,refreshToken): | |
try: | |
client = boto3.client('cognito-idp') | |
return client.initiate_auth( | |
ClientId=self.clientId, | |
AuthFlow='REFRESH_TOKEN_AUTH', | |
AuthParameters={ | |
'REFRESH_TOKEN': refreshToken, | |
'SECRET_HASH': self.getSecretHash(user) | |
} | |
) | |
except botocore.exceptions.ClientError as e: | |
return e.response | |
@add_method(Cognito) | |
def getSecretHash(self, username): | |
# A keyed-hash message authentication code (HMAC) calculated using | |
# the secret key of a user pool client and username plus the client | |
# ID in the message. | |
message = username + self.clientId | |
dig = hmac.new(self.clientSecret.encode(), msg=message.encode('UTF-8'), | |
digestmod=hashlib.sha256).digest() | |
return base64.b64encode(dig).decode() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment