Skip to content

Instantly share code, notes, and snippets.

@chrisdlangton
Last active May 2, 2024 03:03
Show Gist options
  • Save chrisdlangton/11c77772fc0409d4d6d724ea4a6c28bd to your computer and use it in GitHub Desktop.
Save chrisdlangton/11c77772fc0409d4d6d724ea4a6c28bd to your computer and use it in GitHub Desktop.
Time-based HMAC signature SHA256, SHA512, SHA3-256, SHA3-384, SHA3-512, and BLAKE2 for Python Flask with Javascript Forge.js and Bash/OpenSSL/Curl clients
import hashlib
import hmac
from base64 import b64encode
from functools import wraps
from datetime import datetime, timedelta
from flask import request, abort
from flask_login import login_user
from models import User, ApiKey
def require_hmac(not_before_seconds: int = 3, expire_after_seconds: int = 3):
def deco_require_hmac(func):
@wraps(func)
def f_require_hmac(*args, **kwargs):
try:
incoming_data = request.get_data(as_text=True)
incoming_date = request.headers.get('X-Date')
incoming_digest = request.headers.get('X-Digest')
incoming_apikey = request.headers.get('X-ApiKey')
incoming_hmac = request.headers.get('X-Signature')
supported_digests = {
'HMAC-SHA256': hashlib.sha256,
'HMAC-SHA512': hashlib.sha512,
'HMAC-SHA3-256': hashlib.sha3_256,
'HMAC-SHA3-384': hashlib.sha3_384,
'HMAC-SHA3-512': hashlib.sha3_512,
'HMAC-BLAKE2B512': hashlib.blake2b,
}
if incoming_digest not in supported_digests.keys():
log.debug(f'X-Digest [{incoming_digest}] not supported')
return abort(401)
# base64 encode json for signing
if incoming_data:
incoming_data = b64encode(incoming_data.encode('ascii')).decode('ascii')
# not_before prevents replay attacks
compare_date = datetime.fromisoformat(incoming_date if not incoming_date.endswith('+00:00') else incoming_date[:-6])
not_before = datetime.utcnow() - timedelta(seconds=not_before_seconds)
expire_after = datetime.utcnow() + timedelta(seconds=expire_after_seconds)
# expire_after can assist with support for offline/aeroplane mode
if compare_date < not_before or compare_date > expire_after:
log.debug(f'compare_date {compare_date} not_before {not_before} expire_after {expire_after}')
return abort(401)
# fetch the correct shared-secret from database using ApiKey
api_key = ApiKey(api_key=incoming_apikey)
api_key.hydrate()
if api_key.api_key_secret is None:
log.info(f'Missing api_key: {incoming_apikey}')
return abort(401)
# Signing structure
signing_data = bytes(f'{request.method}\n{request.path}\n{incoming_date}\n{incoming_data}'.strip("\n"), 'utf-8')
# Sign HMAC using server-side secret
compare_hmac = hmac.new(bytes(api_key.api_key_secret, 'utf-8'), signing_data, supported_digests.get(incoming_digest)).hexdigest()
# Compare server-side HMAC with client provided HMAC
if not hmac.compare_digest(compare_hmac, incoming_hmac):
log.debug(f'api_key {api_key.api_key} {api_key.comment} signing_data {signing_data} incoming_hmac {incoming_hmac} compare_hmac {compare_hmac}')
return abort(401)
# Success - application login and process the request
user = User(user_id=api_key.user_id)
login_user(user)
ret = func(*args, **kwargs)
except Exception as err:
log.exception(err)
ret = abort(401)
return ret
return f_require_hmac
return deco_require_hmac
// assumes forge.0.10.0.min.js
const hmac_headers = (http_method, path_uri, json) => {
const d = new Date
d.setMilliseconds(0)
const date = d.toISOString().replace(/.000Z/, "")
let signing_data = `${http_method}\n${path_uri}\n${date}`
if (json) {
signing_data = `${signing_data}\n${btoa(json)}`
}
const hmac = forge.hmac.create()
// #### Secure key storage ####
// localStorage enforces same-origin cors
// localStorage.setItem occurs during user login (auth challenge)
// localStorage.removeItem when users logout (server will rotate)
// Use a single localStorage key name so if another user logs in
// it will over-write the previous user and effectively log out
// that other user (like a google account on Chrome)
hmac.start('sha512', localStorage.getItem('hmac-sha512'))
hmac.update(signing_data)
return {
'X-Digest': 'HMAC-SHA512',
'X-ApiKey': api_key,
'X-Date': date,
'X-Signature': hmac.digest().toHex()
}
}
const Api = Object.assign({
get_async: async(path_uri, headers) => {
const http_method = 'GET'
const url = `${api_scheme}${api_domain}${path_uri}`
const response = await fetch(url, {
credentials: 'omit',
method: http_method,
headers: Object.assign({}, hmac_headers(http_method, path_uri), headers)
})
return await response.json()
},
get: (path_uri, headers) => {
const http_method = 'GET'
const url = `${api_scheme}${app.api.domain}${path_uri}`
return fetch(url, {
credentials: 'omit',
method: http_method,
headers: Object.assign({}, hmac_headers(http_method, path_uri), headers)
}).then(response => response.json())
},
post_async: async(path_uri, data, headers) => {
const http_method = 'POST'
const url = `${app.api.scheme}${app.api.domain}${path_uri}`
const json = JSON.stringify(data)
const response = await fetch(url, {
credentials: 'omit',
method: http_method,
body: json,
headers: Object.assign({ 'Content-Type': 'application/json' }, hmac_headers(http_method, path_uri, json), headers),
})
return await response.json()
},
post: (path_uri, data, headers) => {
const http_method = 'POST'
const url = `${app.api.scheme}${app.api.domain}${path_uri}`
const json = JSON.stringify(data)
return fetch(url, {
credentials: 'omit',
method: http_method,
body: json,
headers: Object.assign({ 'Content-Type': 'application/json' }, hmac_headers(http_method, path_uri, json), headers),
}).then(response => response.json())
}
})
Api.get('/test').then(console.log)
Api.post('/test', {'test': 'something cool'}).then(console.log)
console.log(await Api.get_async('/test'))
console.log(await Api.post_async('/test', {'test': 'something cool'}))
#!/usr/bin/env sh
set -x
digest=$1 # sha256 sha512 sha3-256 sha3-384 sha3-512 blake2b512
http_method=$2 # GET POST
path_uri=$3
json_data=$4
base_path=http://localhost:5000
api_key=
api_key_secret=
req_date=$(TZ=UTC date +'%FT%T')
if ! [ -z "${json_data}" ]; then
echo -n ${json_data} | curl -s \
-X ${http_method} \
-H 'Content-Type: application/json' \
-H "X-Digest: HMAC-$(printf '%s\n' $digest | awk '{ print toupper($0) }')" \
-H "X-Signature: $(echo -n "${http_method}\n${path_uri}\n${req_date}\n$(echo -n $json_data | openssl enc -base64)" | openssl dgst -${digest} -hmac "${api_key_secret}" | sed 's/^.*= //')" \
-H "X-ApiKey: ${api_key}" \
-H "X-Date: ${req_date}" \
--data @- -- ${base_path}${path_uri}
else
curl -s \
-X ${http_method} \
-H 'Content-Type: application/json' \
-H "X-Digest: HMAC-$(printf '%s\n' $digest | awk '{ print toupper($0) }')" \
-H "X-Signature: $(echo -n "${http_method}\n${path_uri}\n${req_date}" | openssl dgst -${digest} -hmac "${api_key_secret}" | sed 's/^.*= //')" \
-H "X-ApiKey: ${api_key}" \
-H "X-Date: ${req_date}" \
${base_path}${path_uri}
fi
from flask import Blueprint, jsonify, request
from flask_login import current_user
from .hmac_decorator import require_hmac
blueprint = Blueprint('api', __name__)
@blueprint.route('/test', methods=['GET', 'POST'])
@require_hmac()
def test():
ret = {'member_id': current_user.member_id}
if request.method == 'POST':
params = request.json
ret = {**ret, **params}
return jsonify(ret)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment