Last active
May 8, 2023 19:49
-
-
Save jay0lee/d2947fb11ba70ee172a28cb695d5e23d to your computer and use it in GitHub Desktop.
BCE PoC - One Active User Device
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
runtime: python311 | |
handlers: | |
- url: / | |
secure: always | |
script: auto | |
redirect_http_response_code: 301 | |
- url: /debug | |
secure: always | |
script: auto | |
redirect_http_response_code: 301 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
main.py - BCE One Active Device PoC | |
Proof of concept script that works with Google Identity-Aware Proxy, BeyondCorp | |
Enterprise and Context Aware Access to limit users to one active device at a | |
time. | |
Setup steps: | |
1. Create an AppEngine App | |
2. Enable IAP for the App and determine it's aud value: | |
https://cloud.google.com/iap/docs/signed-headers-howto#verifying_the_jwt_payload | |
enter this value for IAP_AUD below. | |
3. Set IAP to require reauthentication every 5 minutes. | |
https://cloud.google.com/iap/docs/configuring-reauth | |
4. Determine your CUSTOMER_ID at admin.google.com > Account > Account settings > | |
Profile. Remove the "C" prefix from the value and enter below. For example if | |
admin console shows C03uzfv2s then use 03uzfv2s. | |
5. Create a delegated admin with "Manage Devices and Settings" permission and | |
add the AppEngine service account to the role. | |
https://cloud.google.com/appengine/docs/flexible/configure-service-accounts | |
https://support.google.com/a/answer/6208982 | |
this allows the service account to update the client state for your devices | |
marking them as enabled or disabled. | |
6. Deploy your AppEngine app. | |
7. Create a CAA / ACM access level, set it to advanced with a condition of: | |
device.vendors["customer-OAD"].data["active_device"] == true | |
8. Assign the access level to the desired Workspace services or GCP apps. | |
9. Configure the CAA block message to direct the users to the AppEngine app | |
so that they can switch active device when blocked. | |
''' | |
from json import dumps | |
from flask import Flask, request | |
import google.auth | |
from google.auth.transport import requests | |
from google.oauth2 import id_token | |
from googleapiclient.discovery import build | |
from googleapiclient.errors import HttpError | |
app = Flask(__name__) | |
# UPDATE THESE VALUES | |
CUSTOMER_ID = '' | |
IAP_AUD = '' | |
# END UPDATE VALUES | |
CUSTOMER = f'customers/{CUSTOMER_ID}' | |
CLIENT_STATE = f'/clientStates/{CUSTOMER_ID}-OAD' | |
DEVICE_TYPES = { | |
'DEVICE_TYPE_UNSPECIFIED': 'Unknown Device', | |
'ANDROID': 'Android Device', | |
'IOS': 'Apple iOS Device', | |
'GOOGLE_SYNC': 'Google Sync (Activesync) Device', | |
'WINDOWS': 'Windows Device', | |
'MAC_OS': 'Apple MacOS Device', | |
'LINUX': 'Linux Device', | |
'CHROME_OS': 'Chrome OS Device', | |
} | |
def validate_iap_jwt(iap_jwt, expected_audience): | |
"""Validate an IAP JWT. | |
Args: | |
iap_jwt: The contents of the X-Goog-IAP-JWT-Assertion header. | |
expected_audience: The Signed Header JWT audience. See | |
https://cloud.google.com/iap/docs/signed-headers-howto | |
for details on how to get this value. | |
Returns: | |
(decoded_jwt, error_str). | |
""" | |
expect_iss = 'https://cloud.google.com/iap' | |
try: | |
decoded_jwt = id_token.verify_token( | |
iap_jwt, requests.Request(), audience=expected_audience, | |
certs_url='https://www.gstatic.com/iap/verify/public_key') | |
if (got_iss := decoded_jwt.get('iss')) != expect_iss: | |
return (None, f'ERROR: JWT Issuer invalid. Expected {expect_iss} got {got_iss}') | |
return (decoded_jwt, None) | |
except google.auth.exceptions.InvalidValue as err: | |
return (None, f'ERROR: JWT validation error: {err}') | |
except google.auth.exceptions.MalformedError as err: | |
return (None, f'ERROR: Invalid JWT Error: {err}') | |
def get_current_device_id_and_user(): | |
"""Returns current device ID or None | |
Args: | |
None | |
Returns: | |
(device_id, user, err) - current device ID string and user email string or error on failure | |
""" | |
headers = request.headers | |
jwt_header = headers.get('X-Goog-Iap-Jwt-Assertion', {}) | |
jwt, err = validate_iap_jwt(jwt_header, IAP_AUD) | |
if err: | |
return (None, None, f'ERROR: could not decode IAP JWT: {err}') | |
current_device_id = jwt.get('google', {}).get('device_id') | |
if not current_device_id: | |
return (None, | |
None, | |
'ERROR: Couldn\'t get current device ID. Make you\'ve added ' \ | |
'your Google account to the device and try logging ' \ | |
'out and back into Chrome Sync.') | |
user = jwt.get('email') | |
return (current_device_id, user, None) | |
def build_cloud_identity_service(): | |
"""Returns Cloud Identity Service authorized as service account""" | |
scopes = 'https://www.googleapis.com/auth/cloud-identity.devices' | |
creds, _ = google.auth.default(scopes=scopes) | |
return build('cloudidentity', 'v1', credentials=creds) | |
def get_all_api_pages(svc, function, args, items): | |
"""Returns all pages of a given list API call""" | |
results = [] | |
while True: | |
method = getattr(svc, function) | |
try: | |
page = method(**args).execute(num_retries=5) | |
except HttpError as err: | |
return (None, err.reason) | |
results.extend(page.get(items, [])) | |
if 'nextPageToken' in page: | |
args['pageToken'] = page['nextPageToken'] | |
else: | |
break | |
return (results, None) | |
def get_devices(cloud_iden, user): | |
'''Returns list of user's devices''' | |
filter_ = f'email:{user}' | |
args = { | |
'customer': CUSTOMER, | |
'filter': filter_, | |
'fields': 'devices(name,deviceId,deviceType,model,manufacturer),nextPageToken', | |
'pageSize': 100, | |
} | |
result, err = get_all_api_pages(cloud_iden.devices(), | |
'list', | |
args, | |
'devices') | |
if err: | |
return (None, err) | |
user_devices = [] | |
for device in result: | |
device_id = device.get('deviceId') | |
device_name = device.get('name', '') | |
device_model = device.get('model', '') | |
device_manufacturer = device.get('manufacturer', '') | |
model = f'{device_manufacturer} {device_model}'.strip() | |
if not model: | |
device_type = device.get('deviceType') | |
model = DEVICE_TYPES.get(device_type, 'Unknown Device') | |
user_devices.append((device_id, device_name, model)) | |
return (user_devices, None) | |
def get_userdevices(cloud_iden, user): | |
'''Returns list of user's device profiles''' | |
filter_ = f'email:{user}' | |
args = { | |
'parent': 'devices/-', | |
'customer': CUSTOMER, | |
'filter': filter_, | |
'pageSize': 20, | |
'fields': 'deviceUsers(name),nextPageToken', | |
} | |
result, err = get_all_api_pages(cloud_iden.devices().deviceUsers(), | |
'list', | |
args, | |
'deviceUsers') | |
if err: | |
return (None, err) | |
return ([d.get('name') for d in result], None) | |
def get_clientstates(cloud_iden, user): | |
'''Returns list of user's device profiles client states''' | |
filter_ = f'email:{user}' | |
args = { | |
'parent': 'devices/-/deviceUsers/-', | |
'customer': CUSTOMER, | |
'filter': filter_, | |
'fields': 'clientStates(name,keyValuePairs),nextPageToken', | |
} | |
result, err = get_all_api_pages(cloud_iden.devices().deviceUsers().clientStates(), | |
'list', | |
args, | |
'clientStates') | |
if err: | |
return (None, err) | |
client_states = {} | |
for state in result: | |
client_name = state.get('name') | |
if not client_name.endswith(CLIENT_STATE): | |
# ignore client states by vendors other than our -OAD state | |
continue | |
kvp = state.get('keyValuePairs') | |
client_states[client_name] = kvp.get('active_device', {}).get('boolValue') | |
return (client_states, None) | |
def get_all_user_devices(cloud_iden, user): | |
"""Returns all device info of given user. 3 API endpoints are necessary: | |
- cloudidentity.devices.list() - get deviceId attribute to match to IAP device_id. | |
- cloudidentity.devices.deviceUsers.list() - get deviceUser name | |
attribute used in updates | |
- cloudidentity.devices.deviceUsers.clientStates.list() - get UAD | |
client state values we maintain | |
Args: | |
user: email address of user whose devices should be returned. | |
Returns: | |
Tuple of: | |
devices: dictionary of user device IDs as key { | |
enabled: boolean, if this device is active | |
device_name: device name for Cloud Identity API | |
Optional client_state: client state ID for Cloud Identity API | |
} | |
err: if there was an error, None otherwise | |
""" | |
user_devices, err = get_devices(cloud_iden, user) | |
if err: | |
return (None, err) | |
user_deviceusers, err = get_userdevices(cloud_iden, user) | |
if err: | |
return (None, err) | |
client_states, err = get_clientstates(cloud_iden, user) | |
if err: | |
return (None, err) | |
# pull it all together | |
all_user_devices = {} | |
for device_id, device_name, device_model in user_devices: | |
all_user_devices[device_id] = {'device_name': device_name, | |
'enabled': False, | |
'model': device_model} | |
for deviceuser in user_deviceusers: | |
if deviceuser.startswith(device_name): | |
all_user_devices[device_id]['deviceuser_name'] = deviceuser | |
break | |
for client_state, is_enabled in client_states.items(): | |
if client_state.startswith(device_name): | |
all_user_devices[device_id]['enabled'] = is_enabled | |
break | |
return (all_user_devices, None) | |
def update_device(cloud_iden, du_name, enabled): | |
'''Updates given device marking it as active or inactive''' | |
client_state = f'{du_name}{CLIENT_STATE}' | |
body = {'keyValuePairs': {'active_device': {'boolValue': enabled}}} | |
update_mask = ','.join(body.keys()) | |
cloud_iden.devices().deviceUsers().clientStates().patch( | |
name=client_state, | |
body=body, | |
customer=CUSTOMER, | |
updateMask=update_mask).execute(num_retries=5) | |
def update_devices(cloud_iden, enable_device, all_user_devices): | |
'''Loops through devices and enables/disables as needed.''' | |
for device, val in all_user_devices.items(): | |
if device == enable_device: | |
update_device(cloud_iden, val['deviceuser_name'], True) | |
val['enabled'] = True | |
elif val['enabled']: | |
update_device(cloud_iden, val['deviceuser_name'], False) | |
val['enabled'] = False | |
return all_user_devices | |
@app.route('/debug') | |
def debug(): | |
'''/debug page outputs decoded IAP JWT user / device info''' | |
headers = request.headers | |
jwt_header = headers.get('X-Goog-Iap-Jwt-Assertion', {}) | |
jwt, err = validate_iap_jwt(jwt_header, IAP_AUD) | |
if err: | |
return f'ERROR: could not decode IAP JWT: {err}' | |
return dumps(jwt, indent=4, sort_keys=True).replace(' ', ' ').replace('\n', '<br>') | |
@app.route('/') | |
def root(): | |
'''Main screen turn on''' | |
current_device_id, user, err = get_current_device_id_and_user() | |
if err: | |
return err | |
cloud_iden = build_cloud_identity_service() | |
all_user_devices, err = get_all_user_devices(cloud_iden, user) | |
if err: | |
return err | |
result = '<p style="font-family:verdana">' | |
if all_user_devices.get(current_device_id, {}).get('enabled'): | |
result += '<b>This device is ALREADY your enabled device</b><br><br>' | |
else: | |
all_user_devices = update_devices(cloud_iden, | |
current_device_id, | |
all_user_devices) | |
result += '<b>This device is NOW your enabled device</b><br><br>' | |
for dev in all_user_devices.values(): | |
if dev.get('enabled'): | |
result += '✅ ' | |
else: | |
result += '❌ ' | |
result += f'{dev["model"]}<br>' | |
result += '</p>' | |
return result |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cryptography | |
Flask | |
gunicorn | |
google-auth | |
google-api-python-client | |
requests |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment