Created
July 22, 2014 13:17
-
-
Save sstelfox/d10f7c2762af110bdbe8 to your computer and use it in GitHub Desktop.
Simple authenticator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env ruby | |
| require 'openssl' | |
| DEFAULT_INTERVAL = 30 | |
| module Base32 | |
| Base32Error = Class.new(RuntimeError) | |
| CHARS = "abcdefghijklmnopqrstuvwxyz234567".each_char.to_a | |
| def self.decode(str) | |
| output = [] | |
| str.scan(/.{1,8}/).each do |block| | |
| char_array = decode_block(block).map{|c| c.chr} | |
| output << char_array | |
| end | |
| output.join | |
| end | |
| protected | |
| def self.decode_block(block) | |
| length = block.scan(/[^=]/).length | |
| quints = block.each_char.map {|c| decode_quint(c)} | |
| bytes = [] | |
| bytes[0] = (quints[0] << 3) + (quints[1] ? quints[1] >> 2 : 0) | |
| return bytes if length < 3 | |
| bytes[1] = ((quints[1] & 3) << 6) + (quints[2] << 1) + (quints[3] ? quints[3] >> 4 : 0) | |
| return bytes if length < 4 | |
| bytes[2] = ((quints[3] & 15) << 4) + (quints[4] ? quints[4] >> 1 : 0) | |
| return bytes if length < 6 | |
| bytes[3] = ((quints[4] & 1) << 7) + (quints[5] << 2) + (quints[6] ? quints[6] >> 3 : 0) | |
| return bytes if length < 7 | |
| bytes[4] = ((quints[6] & 7) << 5) + (quints[7] || 0) | |
| bytes | |
| end | |
| def self.decode_quint(q) | |
| CHARS.index(q.downcase) or raise(Base32Error, "Invalid Base32 Character - '#{q}'") | |
| end | |
| end | |
| class TOTP | |
| attr_reader :secret, :digits, :digest, :interval | |
| def initialize(s, options = {}) | |
| @interval = options[:interval] || DEFAULT_INTERVAL | |
| @digits = options[:digits] || 6 | |
| @digest = options[:digest] || "sha1" | |
| @secret = s | |
| end | |
| # Accepts either a Unix timestamp integer or a Time object. | |
| # Time objects will be adjusted to UTC automatically | |
| # @param [Time/Integer] time the time to generate an OTP for | |
| # @option [Boolean] padding (true) Issue the number as a 0 padded string | |
| def at(time, padding=true) | |
| unless time.class == Time | |
| time = Time.at(time.to_i) | |
| end | |
| generate_otp(timecode(time), padding) | |
| end | |
| # Generate the current time OTP | |
| # @return [Integer] the OTP as an integer | |
| def now(padding=true) | |
| generate_otp(timecode(Time.now), padding) | |
| end | |
| # Verifies the OTP passed in against the current time OTP | |
| # @param [String/Integer] otp the OTP to check against | |
| def verify(otp, time = Time.now) | |
| unless input.is_a?(String) && generated.is_a?(String) | |
| raise ArgumentError, "This OTP only verifies strings" | |
| end | |
| time_constant_compare(otp, self.at(time)) | |
| end | |
| # Verifies the OTP passed in against the current time OTP | |
| # and adjacent intervals up to +drift+. | |
| # @param [String] otp the OTP to check against | |
| # @param [Integer] drift the number of seconds that the client | |
| # and server are allowed to drift apart | |
| def verify_with_drift(otp, drift, time = Time.now) | |
| time = time.to_i | |
| times = (time-drift..time+drift).step(interval).to_a | |
| times << time + drift if times.last < time + drift | |
| times.any? { |ti| verify(otp, ti) } | |
| end | |
| def generate_otp(input, padded = true) | |
| hmac = OpenSSL::HMAC.digest( | |
| OpenSSL::Digest.new(digest), | |
| byte_secret, | |
| int_to_bytestring(input) | |
| ) | |
| offset = hmac[-1].ord & 0xf | |
| code = (hmac[offset].ord & 0x7f) << 24 | | |
| (hmac[offset + 1].ord & 0xff) << 16 | | |
| (hmac[offset + 2].ord & 0xff) << 8 | | |
| (hmac[offset + 3].ord & 0xff) | |
| if padded | |
| (code % 10 ** digits).to_s.rjust(digits, '0') | |
| else | |
| code % 10 ** digits | |
| end | |
| end | |
| private | |
| def byte_secret | |
| Base32.decode(@secret) | |
| end | |
| # Turns an integer to the OATH specified bytestring, which is fed to the HMAC | |
| # along with the secret | |
| def int_to_bytestring(int, padding = 8) | |
| result = [] | |
| until int == 0 | |
| result << (int & 0xFF).chr | |
| int >>= 8 | |
| end | |
| result.reverse.join.rjust(padding, 0.chr) | |
| end | |
| # constant-time compare the strings | |
| def time_constant_compare(a, b) | |
| return false if a.empty? || b.empty? || a.bytesize != b.bytesize | |
| l = a.unpack "C#{a.bytesize}" | |
| res = 0 | |
| b.each_byte { |byte| res |= byte ^ l.shift } | |
| res == 0 | |
| end | |
| def timecode(time) | |
| time.utc.to_i / interval | |
| end | |
| end | |
| if ARGV.empty? | |
| puts 'You need to provide the secret via the arguments' | |
| exit 1 | |
| end | |
| totp = TOTP.new(ARGV.join) | |
| puts "#{totp.now} -> Good for another #{totp.interval - (Time.now.to_i % totp.interval)} seconds." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment