Last active
January 12, 2024 13:13
-
-
Save jpbarto/c484c923c365b3e391b8eb5029cbaebc to your computer and use it in GitHub Desktop.
Simple script to read users in a Cognito user pool, check them for failed logins, and put those failed logins to CloudWatch logs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
The following script demonstrates how to use the AWS Boto3 SDK to iterate through | |
all of the users in an AWS Cognito User Pool and examine the events associated | |
with each user. | |
If any failed authentication events are found the script formats them as messages | |
and logs them to CloudWatch logs. | |
This script could easily be modified to run periodically as a Lambda function | |
triggered by a scheduled CloudWatch rule. If transforming this script to a | |
Lambda function I would recommend using the os.environ to retrieve parameters for | |
the Lambda function such as AWS Region and Cognito User Pool identity. | |
""" | |
import boto3 | |
# set the region to operate in | |
region = boto3.session.Session().region_name | |
# create clients for Cognito Identity Provider (User pools) and CloudWatch logs | |
idp = boto3.client ('cognito-idp', region_name = region) | |
logs = boto3.client ('logs', region_name = region) | |
# define the user pool this script will work with | |
user_pool_id = '<your user pool here>' | |
def get_users (): | |
""" | |
Retreive a list of users from the Cognito user pool. | |
Returns a list of dictionaries in the form of: | |
[ | |
{'username': 'user001', 'email': '[email protected]'}, | |
... | |
{'username': 'scott', 'email': '[email protected]'} | |
] | |
""" | |
usernames = list () | |
# NOTE if your user pool has a lot of users in it you will need to look for | |
# a pagination token in the response. Subsequent requests to the user pool | |
# should reference the pagination token to get the next set of users. | |
# | |
# For more detail see | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html#CognitoIdentityProvider.Client.list_users | |
users_resp = idp.list_users ( | |
UserPoolId = user_pool_id, | |
AttributesToGet = ['email']) | |
# iterate over the returned users and extract username and email | |
for user in users_resp['Users']: | |
user_record = {'username': user['Username'], 'email': None} | |
for attr in user['Attributes']: | |
if attr['Name'] == 'email': | |
user_record['email'] = attr['Value'] | |
usernames.append (user_record) | |
return usernames | |
def get_auth_events (username): | |
""" | |
For a given username retreive the most recent authentication events up to | |
a maximum of RESULT_LIMIT events. This is an arbitrary value set by the author. | |
Returns a list of dictionaries of the form | |
[ | |
{'event_type': 'SignIn', | |
'timestamp': '2018-09-24T23:58:04Z', | |
'unix_timestamp': 1537833484000, | |
'result': 'Fail', | |
'ip_address': '192.158.68.23', | |
'location_city': 'London', | |
'location_country': 'United Kingdom'}, | |
... | |
] | |
""" | |
RESULT_LIMIT = 5 | |
auth_events = list () | |
# List user auth events may also return a 'next' token if more events | |
# were requested than can be retrieved in a single call. You may want | |
# to keep an eye out for a token in the response to know that there are | |
# more events waiting for retrieval. | |
# For more see | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html#CognitoIdentityProvider.Client.admin_list_user_auth_events | |
events_resp = idp.admin_list_user_auth_events ( | |
UserPoolId = user_pool_id, | |
Username = username, | |
MaxResults = RESULT_LIMIT) | |
for event in events_resp['AuthEvents']: | |
event_record = { | |
'event_type': event['EventType'], | |
'timestamp': str(event['CreationDate']), | |
'unix_timestamp': int(event['CreationDate'].timestamp ()*1000), | |
'result': event['EventResponse'], | |
'ip_address': event['EventContextData']['IpAddress'], | |
'location_city': event['EventContextData']['City'], | |
'location_country': event['EventContextData']['Country'] | |
} | |
auth_events.append (event_record) | |
return auth_events | |
def prepare_log_stream (group_name, stream_name): | |
""" | |
Create a CloudWatch log group and log stream if they don't already exist. | |
group_name (string) name of the log group to be created | |
stream_name (string) name of the log stream to be created in the group | |
Returns None | |
""" | |
group_exists = False | |
stream_exists = False | |
# call describe to determine if the log group already exists | |
resp = logs.describe_log_groups (logGroupNamePrefix = group_name) | |
for group in resp['logGroups']: | |
if group['logGroupName'] == group_name: | |
group_exists = True | |
# if the group wasn't found assume it doesn't exist and create it | |
if not group_exists: | |
logs.create_log_group (logGroupName = GROUP_NAME) | |
# call describe to determine if the log stream already exists | |
resp = logs.describe_log_streams (logGroupName = group_name, logStreamNamePrefix = stream_name) | |
for stream in resp['logStreams']: | |
if stream['logStreamName'] == stream_name: | |
stream_exists = True | |
# if the stream wasn't found, create it | |
if not stream_exists: | |
logs.create_log_stream (logGroupName = GROUP_NAME, logStreamName = STREAM_NAME) | |
# iterate over all the users in the user pool, retreive authentication events for every user | |
# and log failed login attempts to CloudWatch logs | |
for user in get_users (): | |
# arbitration group and stream name for CloudWatch log messages | |
GROUP_NAME = '/myorg/myapp' | |
STREAM_NAME = 'failed_logins' | |
log_entries = list () | |
# ensure that the log group and stream exist before proceeding | |
prepare_log_stream (GROUP_NAME, STREAM_NAME) | |
# iterate over all the events for a user and look for failed logins | |
for event in get_auth_events (user['username']): | |
if event['result'] == 'Fail': | |
log_message = "WARN: User {} ({}) failed to login at {} from {} in {}, {}".format ( | |
user['username'], | |
user['email'], | |
event['timestamp'], | |
event['ip_address'], | |
event['location_city'], | |
event['location_country']) | |
# CloudWatch log entries need to have a timestamp (in millis since 1970) and a message | |
# we defined the message above and are using the timestamp retreived from Cognito | |
log_entries.append ({ | |
'timestamp': event['unix_timestamp'], | |
'message': log_message | |
}) | |
# CloudWatch logs requires the log entries to be ordered by timestamp | |
def on_time (d): | |
return d['timestamp'] | |
log_entries.sort (key=on_time, reverse=False) | |
# use the put log events CloudWatch API to record the messages to CW logs | |
# for more info see | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.put_log_events | |
logs.put_log_events ( | |
logGroupName = GROUP_NAME, | |
logStreamName = STREAM_NAME, | |
logEvents = log_entries | |
) |
To paginate the users you can use it
def list_users(**kwargs):
try:
result = cognito_idp.list_users(Limit=60, **kwargs)
except Exception:
return []
else:
for user in result.get('Users'):
attrs = {attr['Name']: attr['Value'] for attr in user['Attributes']}
yield dict(attrs, status=user['UserStatus'])
if token := result.get('PaginationToken'):
yield from list_users(PaginationToken=token, **kwargs)
>>> for user in list_users(UserPoolId=...):
>>> print(user)
@sergiors - Thank you for the pagination code. I had to make two fixes and then your code paginated properly for me.
- the last line of code is missing
user_pool_id
so that the function can recurse properly. - I got rid of the the walrus (
:=
) operator because I am not using Python 3.8 yet. :)
def list_users(user_pool_id, **kwargs):
result = cognito_idp.list_users(UserPoolId=user_pool_id, Limit=60, **kwargs)
for user in result.get('Users'):
attrs = {attr['Name']: attr['Value'] for attr in user['Attributes']}
yield dict(attrs, status=user['UserStatus'])
token = result.get('PaginationToken')
if token:
yield from list_users(user_pool_id, PaginationToken=token)
@waterimp i recommend pass the param UserPoolId
using kwargs
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note that in order for this to work, the User Pool must have
AdvancedSecurityMode
enabled. Otherwise, you cannot use theAdminListUserAuthEvents
command on line 89.