Created
February 7, 2025 16:32
-
-
Save julik/ec66f12c9e755e3e8a73943c929c3bcb to your computer and use it in GitHub Desktop.
AES-CFB with substream-based blocks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
require "test_helper" | |
class BlockwiseEncryptionTest < ActiveSupport::TestCase | |
class NonRandomAccessible | |
def initialize(encryption_key) | |
@algo = "aes-256-cfb" | |
@iv_and_key_length = 16 + 32 | |
@key_derivation_salt = "my desire" | |
key_and_iv = OpenSSL::KDF.hkdf(encryption_key, salt: @key_derivation_salt, info: "enc", length: @iv_and_key_length, hash: "sha-512") | |
@initial_iv = key_and_iv[...16] | |
@key = key_and_iv[16..] | |
end | |
class Writer | |
attr_reader :written | |
def initialize(destination_io) | |
@destination_io = destination_io | |
@written = 0 | |
end | |
def write(bytes) | |
@destination_io.write(bytes) | |
@written += bytes.bytesize | |
bytes.bytesize | |
end | |
end | |
def write(into_io) | |
c = OpenSSL::Cipher.new(@algo) | |
c.encrypt | |
c.iv = @initial_iv | |
c.key = @key | |
temp = StringIO.new | |
yield(temp) | |
into_io.write(c.update(temp.string) + c.final) | |
end | |
def read_whole(from_io) | |
c = OpenSSL::Cipher.new(@algo) | |
c.decrypt | |
c.iv = @initial_iv | |
c.key = @key | |
c.update(from_io.read) + c.final | |
end | |
def read_range(from_io, range) | |
read_whole(from_io)[range] | |
end | |
end | |
class RandomAccessible < NonRandomAccessible | |
def initialize(encryption_key, substream_size:) | |
super(encryption_key) | |
@substream_size = substream_size | |
end | |
def derive_iv_for_substream_index(iv, substream_i) | |
OpenSSL::KDF.hkdf(iv, salt: @key_derivation_salt, info: substream_i.to_s, length: iv.bytesize, hash: "sha-512") | |
end | |
def write(into_io) | |
written = 0 | |
chunker = ETL::ByteChunker.new(chunk_size: @substream_size) do |bytes| | |
substream_i = written / @substream_size | |
c = OpenSSL::Cipher.new(@algo) | |
c.encrypt | |
c.iv = derive_iv_for_substream_index(@initial_iv, substream_i) | |
c.key = @key | |
into_io.write(c.update(bytes) + c.final) | |
written += bytes.bytesize | |
end | |
yield chunker | |
chunker.finish | |
end | |
def read_whole(from_io) | |
n_substreams, last_substream_length = from_io.size.divmod(@substream_size) | |
n_substreams += 1 if last_substream_length > 0 | |
to_io = StringIO.new | |
n_substreams.times.map do |substream_i| | |
c = OpenSSL::Cipher.new(@algo) | |
c.decrypt | |
c.iv = derive_iv_for_substream_index(@initial_iv, substream_i) | |
c.key = @key | |
to_io.write(c.update(from_io.read(@substream_size)) + c.final) | |
end | |
to_io.string | |
end | |
def read_range(from_io, range) | |
range_end = range.end || from_io.size | |
at_offset = range.begin || 0 | |
n_bytes = range_end - at_offset | |
n_bytes += 1 unless range.exclude_end? | |
# Find where we need to start reading from | |
skip_substreams, offset_into_first_substream = at_offset.divmod(@substream_size) | |
n_substreams_to_read = (n_bytes.to_f / @substream_size).ceil | |
plain_io = StringIO.new | |
n_substreams_to_read.times do |n| | |
substream_i = skip_substreams + n | |
from_io.seek(substream_i * @substream_size) | |
break if from_io.eof? | |
c = OpenSSL::Cipher.new(@algo) | |
c.decrypt | |
c.iv = derive_iv_for_substream_index(@initial_iv, substream_i) | |
c.key = @key | |
# Decrypt the entire substream | |
plain_io.write(c.update(from_io.read(@substream_size)) + c.final) | |
end | |
plain_io.rewind | |
plain_io.seek(offset_into_first_substream) | |
plain_io.read(n_bytes) | |
end | |
end | |
test "has a working non-random-access-version" do | |
encryptor = NonRandomAccessible.new("sekret") | |
initial_string = "Well amazing, incredible really" | |
out = StringIO.new | |
encryptor.write(out) {|w| w.write(initial_string) } | |
out.rewind | |
readback = encryptor.read_whole(out) | |
assert_equal initial_string.bytesize, readback.bytesize | |
assert_equal initial_string, readback | |
ranges = [0..0, 0...0, 0..1, 0.., 0..., 3..7, 3...7, 3..128] | |
ranges.each do |r| | |
out.rewind | |
assert_equal initial_string[r], encryptor.read_range(out, r) | |
end | |
end | |
test "has a working random-access-version" do | |
encryptor = RandomAccessible.new("sekret", substream_size: 3) | |
initial_string = "Well amazing, incredible really" | |
out = StringIO.new | |
encryptor.write(out) {|w| w.write(initial_string) } | |
out.rewind | |
readback = encryptor.read_whole(out) | |
assert_equal initial_string.bytesize, readback.bytesize | |
assert_equal initial_string, readback | |
ranges = [0..0, 0...0, 0..1, 0.., 0..., 3..7, 3...7, 3..128] | |
ranges.each do |r| | |
out.rewind | |
assert_equal initial_string[r], encryptor.read_range(out, r) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment