Skip to content

Instantly share code, notes, and snippets.

@jay0lee
Last active May 8, 2023 19:49
Show Gist options
  • Save jay0lee/d2947fb11ba70ee172a28cb695d5e23d to your computer and use it in GitHub Desktop.
Save jay0lee/d2947fb11ba70ee172a28cb695d5e23d to your computer and use it in GitHub Desktop.
BCE PoC - One Active User Device
runtime: python311
handlers:
- url: /
secure: always
script: auto
redirect_http_response_code: 301
- url: /debug
secure: always
script: auto
redirect_http_response_code: 301
'''
main.py - BCE One Active Device PoC
Proof of concept script that works with Google Identity-Aware Proxy, BeyondCorp
Enterprise and Context Aware Access to limit users to one active device at a
time.
Setup steps:
1. Create an AppEngine App
2. Enable IAP for the App and determine it's aud value:
https://cloud.google.com/iap/docs/signed-headers-howto#verifying_the_jwt_payload
enter this value for IAP_AUD below.
3. Set IAP to require reauthentication every 5 minutes.
https://cloud.google.com/iap/docs/configuring-reauth
4. Determine your CUSTOMER_ID at admin.google.com > Account > Account settings >
Profile. Remove the "C" prefix from the value and enter below. For example if
admin console shows C03uzfv2s then use 03uzfv2s.
5. Create a delegated admin with "Manage Devices and Settings" permission and
add the AppEngine service account to the role.
https://cloud.google.com/appengine/docs/flexible/configure-service-accounts
https://support.google.com/a/answer/6208982
this allows the service account to update the client state for your devices
marking them as enabled or disabled.
6. Deploy your AppEngine app.
7. Create a CAA / ACM access level, set it to advanced with a condition of:
device.vendors["customer-OAD"].data["active_device"] == true
8. Assign the access level to the desired Workspace services or GCP apps.
9. Configure the CAA block message to direct the users to the AppEngine app
so that they can switch active device when blocked.
'''
from json import dumps
from flask import Flask, request
import google.auth
from google.auth.transport import requests
from google.oauth2 import id_token
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
app = Flask(__name__)
# UPDATE THESE VALUES
CUSTOMER_ID = ''
IAP_AUD = ''
# END UPDATE VALUES
CUSTOMER = f'customers/{CUSTOMER_ID}'
CLIENT_STATE = f'/clientStates/{CUSTOMER_ID}-OAD'
DEVICE_TYPES = {
'DEVICE_TYPE_UNSPECIFIED': 'Unknown Device',
'ANDROID': 'Android Device',
'IOS': 'Apple iOS Device',
'GOOGLE_SYNC': 'Google Sync (Activesync) Device',
'WINDOWS': 'Windows Device',
'MAC_OS': 'Apple MacOS Device',
'LINUX': 'Linux Device',
'CHROME_OS': 'Chrome OS Device',
}
def validate_iap_jwt(iap_jwt, expected_audience):
"""Validate an IAP JWT.
Args:
iap_jwt: The contents of the X-Goog-IAP-JWT-Assertion header.
expected_audience: The Signed Header JWT audience. See
https://cloud.google.com/iap/docs/signed-headers-howto
for details on how to get this value.
Returns:
(decoded_jwt, error_str).
"""
expect_iss = 'https://cloud.google.com/iap'
try:
decoded_jwt = id_token.verify_token(
iap_jwt, requests.Request(), audience=expected_audience,
certs_url='https://www.gstatic.com/iap/verify/public_key')
if (got_iss := decoded_jwt.get('iss')) != expect_iss:
return (None, f'ERROR: JWT Issuer invalid. Expected {expect_iss} got {got_iss}')
return (decoded_jwt, None)
except google.auth.exceptions.InvalidValue as err:
return (None, f'ERROR: JWT validation error: {err}')
except google.auth.exceptions.MalformedError as err:
return (None, f'ERROR: Invalid JWT Error: {err}')
def get_current_device_id_and_user():
"""Returns current device ID or None
Args:
None
Returns:
(device_id, user, err) - current device ID string and user email string or error on failure
"""
headers = request.headers
jwt_header = headers.get('X-Goog-Iap-Jwt-Assertion', {})
jwt, err = validate_iap_jwt(jwt_header, IAP_AUD)
if err:
return (None, None, f'ERROR: could not decode IAP JWT: {err}')
current_device_id = jwt.get('google', {}).get('device_id')
if not current_device_id:
return (None,
None,
'ERROR: Couldn\'t get current device ID. Make you\'ve added ' \
'your Google account to the device and try logging ' \
'out and back into Chrome Sync.')
user = jwt.get('email')
return (current_device_id, user, None)
def build_cloud_identity_service():
"""Returns Cloud Identity Service authorized as service account"""
scopes = 'https://www.googleapis.com/auth/cloud-identity.devices'
creds, _ = google.auth.default(scopes=scopes)
return build('cloudidentity', 'v1', credentials=creds)
def get_all_api_pages(svc, function, args, items):
"""Returns all pages of a given list API call"""
results = []
while True:
method = getattr(svc, function)
try:
page = method(**args).execute(num_retries=5)
except HttpError as err:
return (None, err.reason)
results.extend(page.get(items, []))
if 'nextPageToken' in page:
args['pageToken'] = page['nextPageToken']
else:
break
return (results, None)
def get_devices(cloud_iden, user):
'''Returns list of user's devices'''
filter_ = f'email:{user}'
args = {
'customer': CUSTOMER,
'filter': filter_,
'fields': 'devices(name,deviceId,deviceType,model,manufacturer),nextPageToken',
'pageSize': 100,
}
result, err = get_all_api_pages(cloud_iden.devices(),
'list',
args,
'devices')
if err:
return (None, err)
user_devices = []
for device in result:
device_id = device.get('deviceId')
device_name = device.get('name', '')
device_model = device.get('model', '')
device_manufacturer = device.get('manufacturer', '')
model = f'{device_manufacturer} {device_model}'.strip()
if not model:
device_type = device.get('deviceType')
model = DEVICE_TYPES.get(device_type, 'Unknown Device')
user_devices.append((device_id, device_name, model))
return (user_devices, None)
def get_userdevices(cloud_iden, user):
'''Returns list of user's device profiles'''
filter_ = f'email:{user}'
args = {
'parent': 'devices/-',
'customer': CUSTOMER,
'filter': filter_,
'pageSize': 20,
'fields': 'deviceUsers(name),nextPageToken',
}
result, err = get_all_api_pages(cloud_iden.devices().deviceUsers(),
'list',
args,
'deviceUsers')
if err:
return (None, err)
return ([d.get('name') for d in result], None)
def get_clientstates(cloud_iden, user):
'''Returns list of user's device profiles client states'''
filter_ = f'email:{user}'
args = {
'parent': 'devices/-/deviceUsers/-',
'customer': CUSTOMER,
'filter': filter_,
'fields': 'clientStates(name,keyValuePairs),nextPageToken',
}
result, err = get_all_api_pages(cloud_iden.devices().deviceUsers().clientStates(),
'list',
args,
'clientStates')
if err:
return (None, err)
client_states = {}
for state in result:
client_name = state.get('name')
if not client_name.endswith(CLIENT_STATE):
# ignore client states by vendors other than our -OAD state
continue
kvp = state.get('keyValuePairs')
client_states[client_name] = kvp.get('active_device', {}).get('boolValue')
return (client_states, None)
def get_all_user_devices(cloud_iden, user):
"""Returns all device info of given user. 3 API endpoints are necessary:
- cloudidentity.devices.list() - get deviceId attribute to match to IAP device_id.
- cloudidentity.devices.deviceUsers.list() - get deviceUser name
attribute used in updates
- cloudidentity.devices.deviceUsers.clientStates.list() - get UAD
client state values we maintain
Args:
user: email address of user whose devices should be returned.
Returns:
Tuple of:
devices: dictionary of user device IDs as key {
enabled: boolean, if this device is active
device_name: device name for Cloud Identity API
Optional client_state: client state ID for Cloud Identity API
}
err: if there was an error, None otherwise
"""
user_devices, err = get_devices(cloud_iden, user)
if err:
return (None, err)
user_deviceusers, err = get_userdevices(cloud_iden, user)
if err:
return (None, err)
client_states, err = get_clientstates(cloud_iden, user)
if err:
return (None, err)
# pull it all together
all_user_devices = {}
for device_id, device_name, device_model in user_devices:
all_user_devices[device_id] = {'device_name': device_name,
'enabled': False,
'model': device_model}
for deviceuser in user_deviceusers:
if deviceuser.startswith(device_name):
all_user_devices[device_id]['deviceuser_name'] = deviceuser
break
for client_state, is_enabled in client_states.items():
if client_state.startswith(device_name):
all_user_devices[device_id]['enabled'] = is_enabled
break
return (all_user_devices, None)
def update_device(cloud_iden, du_name, enabled):
'''Updates given device marking it as active or inactive'''
client_state = f'{du_name}{CLIENT_STATE}'
body = {'keyValuePairs': {'active_device': {'boolValue': enabled}}}
update_mask = ','.join(body.keys())
cloud_iden.devices().deviceUsers().clientStates().patch(
name=client_state,
body=body,
customer=CUSTOMER,
updateMask=update_mask).execute(num_retries=5)
def update_devices(cloud_iden, enable_device, all_user_devices):
'''Loops through devices and enables/disables as needed.'''
for device, val in all_user_devices.items():
if device == enable_device:
update_device(cloud_iden, val['deviceuser_name'], True)
val['enabled'] = True
elif val['enabled']:
update_device(cloud_iden, val['deviceuser_name'], False)
val['enabled'] = False
return all_user_devices
@app.route('/debug')
def debug():
'''/debug page outputs decoded IAP JWT user / device info'''
headers = request.headers
jwt_header = headers.get('X-Goog-Iap-Jwt-Assertion', {})
jwt, err = validate_iap_jwt(jwt_header, IAP_AUD)
if err:
return f'ERROR: could not decode IAP JWT: {err}'
return dumps(jwt, indent=4, sort_keys=True).replace(' ', '&nbsp;').replace('\n', '<br>')
@app.route('/')
def root():
'''Main screen turn on'''
current_device_id, user, err = get_current_device_id_and_user()
if err:
return err
cloud_iden = build_cloud_identity_service()
all_user_devices, err = get_all_user_devices(cloud_iden, user)
if err:
return err
result = '<p style="font-family:verdana">'
if all_user_devices.get(current_device_id, {}).get('enabled'):
result += '<b>This device is ALREADY your enabled device</b><br><br>'
else:
all_user_devices = update_devices(cloud_iden,
current_device_id,
all_user_devices)
result += '<b>This device is NOW your enabled device</b><br><br>'
for dev in all_user_devices.values():
if dev.get('enabled'):
result += '✅ '
else:
result += '❌ '
result += f'{dev["model"]}<br>'
result += '</p>'
return result
cryptography
Flask
gunicorn
google-auth
google-api-python-client
requests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment