Last active
April 20, 2020 07:22
-
-
Save spllr/d04c76bbc5b748bd819d53d14c9a4091 to your computer and use it in GitHub Desktop.
Demo implementation of "Privacy-Preserving Contact Tracing" introduced by Apple and Google during the COVID‑19 pandemic.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# Usage: privacy_preserving_contact_tracing_test.rb [options] | |
# | |
# Specific options: | |
# -K, --tracing-key=TRACING_KEY Tracing key, 32 bytes, HEX encoded | |
# -DDAILY_TRACING_KEY, Daily tracing key, 16 bytes, HEX encoded | |
# --daily-tracing-key When provided TRACING_KEY is ignored | |
# -t, --time=TIME Date and/ or time to use start | |
# -d, --days=NUM_DAYS Number of days to generate identifiers for | |
# -r, --full-day Generate all identifier for the day. Ignored if -d is provided | |
# -f, --format=FORMAT Select output format (txt, csv) | |
# -o, --output=FILE Path to write output to | |
## | |
# Demo implementation of "Privacy-Preserving Contact Tracing" introduced by | |
# Apple and Google during the COVID‑19 pandemic. | |
# | |
# This code is written solely for the purpose of getting a better understanding | |
# of what is proposed. | |
# | |
# @author Klaas Speller | |
# @see https://www.apple.com/newsroom/2020/04/apple-and-google-partner-on-covid-19-contact-tracing-technology/ | |
# @see https://www.apple.com/covid19/contacttracing/ | |
# | |
# LICENSE: | |
# | |
# Copyright 2020 Klaas Speller <[email protected], @spllr> | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
# | |
require 'openssl' | |
require 'optparse' | |
require 'optparse/time' | |
require 'csv' | |
## | |
# ContactTracing Crypto | |
# | |
# The CT Module contains all functions described in the | |
# *ContactTracing-CryptographySpecification.pdf*. | |
# | |
# The implementation tries to stay close to the definitiions of each function. | |
# | |
# @see https://covid19-static.cdn-apple.com/applications/covid19/current/static/contact-tracing/pdf/ContactTracing-CryptographySpecification.pdf | |
# | |
module CT | |
TEN_MINUTES=60*10 | |
ONE_DAY=(60 * 60 * 24) | |
class << self | |
## | |
# DayNumber | |
# | |
# Provides a number for each 24-hour window. These time windows are based | |
# on Unix Epoch Time. DayNumber | |
# | |
# (Number of Seconds since Epoch) / 60 × 60 × 24 | |
# | |
# DayNumber is encoded as a 32-bit (uint32_t) unsigned little-endian value. | |
# | |
def day_number(seconds_since_epoch=Time.now.to_i) | |
seconds_since_epoch / (60 * 60 * 24) | |
end | |
## | |
# Seconds Since Start of DayNumber | |
# | |
# where Seconds Since Start of DayNumber: | |
# | |
# Number of Seconds since Epoch % (60*60*24) | |
# | |
def second_since_start_of_day(seconds_since_epoch=Time.now.to_i) | |
seconds_since_epoch % (60 * 60 * 24) | |
end | |
def start_of_day(seconds_since_epoch=Time.now.to_i) | |
seconds_since_epoch - second_since_start_of_day(seconds_since_epoch) | |
end | |
def end_of_day(seconds_since_epoch=Time.now.to_i) | |
start_of_day(seconds_since_epoch) + (60 * 60 * 24) | |
end | |
## | |
# TimeIntervalNumber | |
# | |
# Provides a number for each 10-minute window in a 24-hour window as | |
# defined by DayNumber. This value will be in the [0,143] interval. | |
# | |
# TimeNumberInterval = Seconds Since Start of DayNumber / (60 × 10) | |
# | |
def time_interval_number(seconds_since_epoch=Time.now.to_i) | |
second_since_start_of_day(seconds_since_epoch) / (60 * 10) | |
end | |
## | |
# CRNG designates a cryptographic random number generator. | |
# | |
# Output ← CRNG(OutputLength) | |
# | |
def CRNG(output_length) | |
OpenSSL::Random.random_bytes(output_length) | |
end | |
## | |
# HMAC | |
# | |
# HMAC designates the HMAC function as defined by IETF RFC 2104, using the | |
# SHA-256 hash function: | |
# | |
# Output ← HMAC(Key, Data) | |
# | |
def HMAC(key, data) | |
OpenSSL::HMAC.hexdigest("SHA256", key, data) | |
end | |
## | |
# HKDF | |
# | |
# HKDF designates the HKDF function as defined by IETF RFC 5869, using the | |
# SHA-256 hash function: | |
# | |
# Output ← HKDF(Key, Salt, Info, OutputLength) | |
# | |
def HKDF(key, salt, info, output_length) | |
OpenSSL::KDF.hkdf(key, salt: salt || '', info: info, length: output_length, hash: "SHA256") | |
end | |
## | |
# Truncation | |
# | |
# Truncate defines a truncation function: | |
# Output ← Truncate(Data, L) | |
# | |
# The Truncate function returns the first L bytes of the data. | |
# The input data size being greater or equal to L is a precondition. | |
# | |
def truncate(data, length) | |
data[(0...length)] | |
end | |
## | |
# Concatenation | |
# | |
# We use the symbol || to denote concatenation. | |
# | |
def concat(lhs, rhs) | |
lhs.to_s + rhs.to_s | |
end | |
## | |
# Tracing Key | |
# | |
# The Tracing Key is generated when contact tracing is enabled on the | |
# device and is securely stored on the device. | |
# The 32-byte Tracing Key is derived as follows: | |
# | |
# tk ← CRNG(32) | |
# | |
# The Tracing Key never leaves the device. | |
# | |
def generate_tracing_key | |
CRNG(32) | |
end | |
## | |
# Daily Tracing Key | |
# | |
# A Daily Tracing Key is generated for every 24-hour window where the | |
# protocol is advertising. From the Tracing Key, we derive the 16-byte | |
# Daily Tracing Key in the following way: | |
# | |
# dtki ← HKDF(tk,NULL,(UTF8("CT-DTK")||Di),16) | |
# | |
# where Di is the DayNumber for the 24-hour window the broadcast is in. | |
# | |
def generate_daily_tracing_key(tracking_key, seconds_since_epoch=Time.now.to_i) | |
di = day_number(seconds_since_epoch) | |
HKDF(tracking_key, nil, concat("CT-DTK", [di].pack("V")), 16) | |
end | |
## | |
# Rolling Proximity Identifier | |
# The Rolling Proximity Identifiers are privacy-preserving identifiers that | |
# are sent in Bluetooth Advertisements. | |
# Each time the Bluetooth MAC address changes, we derive a new Rolling | |
# Proximity Identifier: | |
# | |
# RPIi,j ← Truncate(HMAC(dkti,(UTF8("CT-RPI")||TINj)),16) | |
# | |
# Where: | |
# | |
# • TINj is the TimeIntervalNumber for the time at which the BLE MAC | |
# address changes. | |
# | |
# The 16-byte Rolling Proximity Identifier is broadcasted over Bluetooth | |
# LE. The use of 16-byte Contact Tracing identifiers yields a low probability | |
# of collisions and limits the risk of false positive matches, while keeping | |
# device storage requirements low. | |
# | |
def generate_rolling_proximity_identifier(tracing_key: nil, daily_tracing_key: nil, time: Time.now.to_i) | |
tinj = time_interval_number(time) | |
dtki = daily_tracing_key || generate_daily_tracing_key(tracing_key, time) | |
rpii_j = truncate(HMAC(dtki, concat("CT-RPI", [tinj].pack("c"))), 16) | |
[rpii_j, dtki, tinj] | |
end | |
end | |
end | |
## | |
# Options | |
# | |
options = { | |
format: :csv | |
} | |
OptionParser.new do |opts| | |
opts.banner = "Usage: privacy_preserving_contact_tracing_test.rb [options]" | |
opts.separator "" | |
opts.separator "Specific options:" | |
opts.on("-KTRACING_KEY", "--tracing-key=TRACING_KEY", "Tracing key, 32 bytes, HEX encoded ") do |key| | |
options[:tracing_key] = [key].pack("H*") | |
end | |
opts.on("-DDAILY_TRACING_KEY", "--daily-tracing-key=DAILY_TRACING_KEY", "Daily tracing key, 16 bytes, HEX encoded", "When provided TRACING_KEY is ignored") do |key| | |
options[:daily_tracing_key] = [key].pack("H*") | |
end | |
opts.on("-tTIME", "--time=TIME", Time, "Date and/ or time to use start") do |time| | |
options[:time] = time.to_i | |
end | |
opts.on("-dNUM_DAYS", "--days=NUM_DAYS", Integer, "Number of days to generate identifiers for") do |days| | |
options[:days] = days.to_i | |
end | |
opts.on("-r", "--full-day", "Generate all identifier for the day. Ignored if -d is provided") do |gen_all| | |
options[:gen_all] = gen_all | |
end | |
FORMATS=[:txt, :csv] | |
opts.on("-fFORMAT", "--format=FORMAT", FORMATS, "Select output format (#{FORMATS.join(", ")})") do |type| | |
options[:format] = type | |
end | |
opts.on("-oFILE", "--output=FILE", String, "Path to write output to") do |pathname| | |
options[:pathname] = File.expand_path(pathname) | |
end | |
end.parse! | |
def dump_txt(options, tracing_key: nil, daily_tracing_key: nil, rolling_identifier: nil, time: 0) | |
output = options[:output] | |
output.write "-------\n" | |
output.write "time: #{time}\n" | |
output.write "day number: #{CT.day_number(time)}\n" | |
output.write "time interval number: #{CT.time_interval_number(time)}\n" | |
output.write "tracing key: #{tracing_key ? tracing_key.unpack("H*")[0] : "xxxxxxxxxxxxxxxxxxx"}\n" | |
output.write "daily tracing key: #{daily_tracing_key.unpack("H*")[0]}\n" | |
output.write "rolling proximity identifier: #{rolling_identifier.unpack("H*")[0]}\n" | |
output.write "\n" | |
end | |
def dump_csv(options, tracing_key: nil, daily_tracing_key: nil, rolling_identifier: nil, time: 0) | |
output = options[:output] | |
output << [ | |
(tracing_key ? tracing_key.unpack("H*")[0] : "xxxxxxxxxxxxxxxxxxx"), | |
daily_tracing_key.unpack("H*")[0], | |
rolling_identifier.unpack("H*")[0], | |
time.to_s, | |
CT.day_number(time).to_s, | |
CT.time_interval_number(time).to_s | |
] | |
end | |
## | |
# Dump info to the console | |
# | |
def dump_info(options, tracing_key: nil, daily_tracing_key: nil, time: 0) | |
rpii_j, dtki, tinj = CT.generate_rolling_proximity_identifier( | |
tracing_key: tracing_key, | |
daily_tracing_key: daily_tracing_key, | |
time: time) | |
unless options[:output] == $stdout | |
$stderr.write "." | |
end | |
case options[:format] | |
when :txt | |
dump_txt(options, tracing_key: tracing_key, daily_tracing_key: dtki, rolling_identifier: rpii_j, time: time) | |
when :csv | |
dump_csv(options, tracing_key: tracing_key, daily_tracing_key: dtki, rolling_identifier: rpii_j, time: time) | |
end | |
end | |
## | |
# Generate rolling proximity identifiers (RPI) | |
# | |
start_time = options[:time] || Time.now.to_i | |
end_time = options[:days] ? (start_time + options[:days] * CT::ONE_DAY) : start_time | |
dtk = options[:daily_tracing_key] | |
tk = dtk ? nil : (options[:tracing_key] || CT.generate_tracing_key) | |
time_range = options[:gen_all] ? (CT.start_of_day(start_time)...CT.end_of_day(end_time)) : (start_time...end_time) | |
num_items = time_range.count <= CT::TEN_MINUTES ? 1 : time_range.step(CT::TEN_MINUTES).count | |
$stderr.write "generating #{num_items} rolling identifiers\n" | |
$stderr.write "format: #{options[:format]}\n" | |
$stderr.write "output: #{options[:pathname] ? options[:pathname] : 'stdout'}\n\n\n" | |
options[:output] = case options[:format] | |
when :txt then (options[:pathname] ? File.open(options[:pathname], "w+") : $stdout) | |
when :csv | |
output = options[:pathname] ? CSV.open(options[:pathname], "w+", headers: true) : CSV.new($stdout, headers: true) | |
output << ["Tracing Key", "Daily Tracing Key", "Rolling Proximity Identifier", "Time", "Day Number", "Time Interval Number"] | |
output | |
end | |
if time_range.count <= CT::TEN_MINUTES | |
dump_info(options, tracing_key: tk, daily_tracing_key: dtk, time: time_range.first) | |
else | |
time_range.step(CT::TEN_MINUTES) do |time| | |
dump_info(options, tracing_key: tk, daily_tracing_key: dtk, time: time) | |
end | |
end | |
$stderr.write "\n\ndone\n" | |
options[:output].close if options[:pathname] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment