Skip to content

Instantly share code, notes, and snippets.

@julik
Created February 7, 2025 16:32
Show Gist options
  • Save julik/ec66f12c9e755e3e8a73943c929c3bcb to your computer and use it in GitHub Desktop.
Save julik/ec66f12c9e755e3e8a73943c929c3bcb to your computer and use it in GitHub Desktop.
AES-CFB with substream-based blocks
# frozen_string_literal: true
require "test_helper"
class BlockwiseEncryptionTest < ActiveSupport::TestCase
class NonRandomAccessible
def initialize(encryption_key)
@algo = "aes-256-cfb"
@iv_and_key_length = 16 + 32
@key_derivation_salt = "my desire"
key_and_iv = OpenSSL::KDF.hkdf(encryption_key, salt: @key_derivation_salt, info: "enc", length: @iv_and_key_length, hash: "sha-512")
@initial_iv = key_and_iv[...16]
@key = key_and_iv[16..]
end
class Writer
attr_reader :written
def initialize(destination_io)
@destination_io = destination_io
@written = 0
end
def write(bytes)
@destination_io.write(bytes)
@written += bytes.bytesize
bytes.bytesize
end
end
def write(into_io)
c = OpenSSL::Cipher.new(@algo)
c.encrypt
c.iv = @initial_iv
c.key = @key
temp = StringIO.new
yield(temp)
into_io.write(c.update(temp.string) + c.final)
end
def read_whole(from_io)
c = OpenSSL::Cipher.new(@algo)
c.decrypt
c.iv = @initial_iv
c.key = @key
c.update(from_io.read) + c.final
end
def read_range(from_io, range)
read_whole(from_io)[range]
end
end
class RandomAccessible < NonRandomAccessible
def initialize(encryption_key, substream_size:)
super(encryption_key)
@substream_size = substream_size
end
def derive_iv_for_substream_index(iv, substream_i)
OpenSSL::KDF.hkdf(iv, salt: @key_derivation_salt, info: substream_i.to_s, length: iv.bytesize, hash: "sha-512")
end
def write(into_io)
written = 0
chunker = ETL::ByteChunker.new(chunk_size: @substream_size) do |bytes|
substream_i = written / @substream_size
c = OpenSSL::Cipher.new(@algo)
c.encrypt
c.iv = derive_iv_for_substream_index(@initial_iv, substream_i)
c.key = @key
into_io.write(c.update(bytes) + c.final)
written += bytes.bytesize
end
yield chunker
chunker.finish
end
def read_whole(from_io)
n_substreams, last_substream_length = from_io.size.divmod(@substream_size)
n_substreams += 1 if last_substream_length > 0
to_io = StringIO.new
n_substreams.times.map do |substream_i|
c = OpenSSL::Cipher.new(@algo)
c.decrypt
c.iv = derive_iv_for_substream_index(@initial_iv, substream_i)
c.key = @key
to_io.write(c.update(from_io.read(@substream_size)) + c.final)
end
to_io.string
end
def read_range(from_io, range)
range_end = range.end || from_io.size
at_offset = range.begin || 0
n_bytes = range_end - at_offset
n_bytes += 1 unless range.exclude_end?
# Find where we need to start reading from
skip_substreams, offset_into_first_substream = at_offset.divmod(@substream_size)
n_substreams_to_read = (n_bytes.to_f / @substream_size).ceil
plain_io = StringIO.new
n_substreams_to_read.times do |n|
substream_i = skip_substreams + n
from_io.seek(substream_i * @substream_size)
break if from_io.eof?
c = OpenSSL::Cipher.new(@algo)
c.decrypt
c.iv = derive_iv_for_substream_index(@initial_iv, substream_i)
c.key = @key
# Decrypt the entire substream
plain_io.write(c.update(from_io.read(@substream_size)) + c.final)
end
plain_io.rewind
plain_io.seek(offset_into_first_substream)
plain_io.read(n_bytes)
end
end
test "has a working non-random-access-version" do
encryptor = NonRandomAccessible.new("sekret")
initial_string = "Well amazing, incredible really"
out = StringIO.new
encryptor.write(out) {|w| w.write(initial_string) }
out.rewind
readback = encryptor.read_whole(out)
assert_equal initial_string.bytesize, readback.bytesize
assert_equal initial_string, readback
ranges = [0..0, 0...0, 0..1, 0.., 0..., 3..7, 3...7, 3..128]
ranges.each do |r|
out.rewind
assert_equal initial_string[r], encryptor.read_range(out, r)
end
end
test "has a working random-access-version" do
encryptor = RandomAccessible.new("sekret", substream_size: 3)
initial_string = "Well amazing, incredible really"
out = StringIO.new
encryptor.write(out) {|w| w.write(initial_string) }
out.rewind
readback = encryptor.read_whole(out)
assert_equal initial_string.bytesize, readback.bytesize
assert_equal initial_string, readback
ranges = [0..0, 0...0, 0..1, 0.., 0..., 3..7, 3...7, 3..128]
ranges.each do |r|
out.rewind
assert_equal initial_string[r], encryptor.read_range(out, r)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment