Last active
November 12, 2023 18:07
-
-
Save danielfaust/a9807b2f2256a49b18564daa8bc6cdf2 to your computer and use it in GitHub Desktop.
This file contains the code to check a YubiKey OTP against YubiCloud's v2 protocol servers for Python 3 and 2.7. This code is not well tested but works, it is only meant for internal stuff like to prevent accidental execution of scripts which deal with database schema migrations or such things which really shouldn't get executed accidentally
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import yubikey_otp_check_v2 | |
def verify(otp): | |
message = '' | |
user = '' | |
users = { | |
b'cccccabcdef1': b'user1', | |
b'cccccabcdef2': b'user2', | |
} | |
if otp[:12] in users: | |
client = { 'id': '0123456', 'secret': 'base64_encoded_secret' } | |
# client details obtained from https://upgrade.yubico.com/getapikey/ | |
yubikey = yubikey_otp_check_v2.YubiKeyOTPCheck(client['id'], client['secret']) | |
yubikey.check_otp(otp) | |
if yubikey.valid == True: | |
user = users[otp[:12]] | |
message = b"ok-" + user | |
else: | |
message = b"not-ok-1" | |
else: | |
message = b"not-ok-2" | |
return message |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# Works with YubiCloud v2 & Python 3 | |
# code is based on https://bitbucket.org/nagylzs/yubistorm | |
################################################################################################### | |
import sys | |
IS_PYTHON_3 = True | |
if sys.version_info[0] < 3: | |
IS_PYTHON_3 = False | |
################################################################################################### | |
import copy, uuid, hmac, base64, hashlib, random, traceback | |
if IS_PYTHON_3: | |
import urllib.request as urllib2 | |
import urllib.parse as urlparse | |
urlencode = urlparse.urlencode | |
else: | |
from urllib import urlencode | |
import urllib2 | |
import urlparse | |
################################################################## | |
class YubiKeyOTPCheck(): | |
#------------------------------------------------------------------------- | |
api_servers = [ | |
"api.yubico.com", | |
"api2.yubico.com", | |
"api3.yubico.com", | |
"api4.yubico.com", | |
"api5.yubico.com", | |
] | |
#------------------------------------------------------------------------- | |
def __init__(self, client_id, client_secret): | |
self.status = 'INITIALIZED' | |
self.valid = False | |
self.client_id = client_id | |
self.client_secret = base64.b64decode(client_secret) | |
#------------------------------------------------------------------------- | |
def _create_signature(cls, params): | |
data = [] | |
for key in sorted(params.keys()): | |
data.append("%s=%s" % (key, params[key])) | |
hashed = hmac.new(cls.client_secret, "&".join(data).encode("ascii"), hashlib.sha1) | |
return base64.b64encode(hashed.digest()).rstrip(b'\n').decode("ascii") | |
#------------------------------------------------------------------------- | |
def _verify_signature(cls, params): | |
if "h" not in params: | |
return False | |
bare = copy.copy(params) | |
del bare["h"] | |
good_signature = cls._create_signature(bare) | |
return good_signature == params["h"] | |
#------------------------------------------------------------------------- | |
def _add_query_params(cls, url, params): | |
url_parts = list(urlparse.urlparse(url)) | |
query = dict(urlparse.parse_qsl(url_parts[4])) | |
query.update(params) | |
if "h" in query: | |
del query["h"] | |
query["h"] = cls._create_signature(query) | |
url_parts[4] = urlencode(query) | |
return urlparse.urlunparse(url_parts) | |
#------------------------------------------------------------------------- | |
def _fetch(cls, params): | |
code = 0 | |
txt = None | |
remaining_attempts = 9 | |
while remaining_attempts > 0: | |
try: | |
server_host = random.choice(cls.api_servers) | |
print('remaining_attempts = %d ( using server %s )' % (remaining_attempts, server_host)) | |
url = cls._add_query_params("https://" + server_host + "/wsapi/2.0/verify", params) | |
print(url) | |
result = urllib2.urlopen(url) | |
code = result.getcode() | |
try: txt = result.read().decode("UTF-8") | |
except: pass | |
result.close() | |
remaining_attempts = 0 | |
except urllib2.URLError: | |
remaining_attempts -= 1 | |
return code, txt | |
#------------------------------------------------------------------------- | |
def _check(cls, params): | |
code, txt = cls._fetch(params) | |
data = {} | |
if code == 200 and txt is not None: | |
try: | |
for item in txt.strip().split('\r\n'): | |
segments = item.split('=', 1) | |
data[segments[0]] = segments[1] | |
except: | |
traceback.print_exc() | |
else: | |
data['SERVER_CODE'] = code | |
return data | |
#------------------------------------------------------------------------- | |
def check_otp(self, otp): | |
self.valid = False | |
params = { | |
'id': self.client_id, | |
'nonce': uuid.uuid4().hex, | |
'timestamp': "1", | |
'otp': otp.decode(), | |
} | |
response = self._check(params) | |
print(response) | |
if self._verify_signature(response): | |
self.status = response['status'] | |
if self.status == 'OK': | |
self.valid = True | |
else: | |
self.status = 'RESPONSE_SIGNATURE_VERIFICATION_FAILED' | |
return response | |
#------------------------------------------------------------------------- | |
################################################################## |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment