Skip to content

Instantly share code, notes, and snippets.

@ismasan
Last active August 22, 2024 10:50
Show Gist options
  • Save ismasan/8e76decb9187a3fe4426d71135ba9171 to your computer and use it in GitHub Desktop.
Save ismasan/8e76decb9187a3fe4426d71135ba9171 to your computer and use it in GitHub Desktop.
Turn a Ruby Enumerator into an IO-like object
# frozen_string_literal: true
# Wraps an enumerator that yields chunks of content into an IO object. It
# The IO is NOT rewindable.
# implements some essential IO methods:
#
# * IO#read
# * IO#readpartial
# * IO#gets
# * IO#size
# * IO#pos
# * IO#eof?
# * IO#close
#
# Adapted from https://github.com/janko/down/blob/master/lib/down/chunked_io.rb
# @example
# io = EnumeratorIO.new(["h1,h2,h3\na", ",b,c\nd,", "e,f\ng,h,i"].to_enum)
# rows = CSV.parse(io, headers: true)
#
class EnumeratorIO
attr_reader :size, :encoding
def initialize(enum, size: nil, encoding: nil)
@enum = enum
@size = size
@encoding = find_encoding(encoding || 'binary')
@buffer = nil
@position = 0
@closed = false
@chunks_depleted = empty_enum?(enum)
end
# Yields elements of the underlying enumerator.
def each_chunk
raise IOError, 'closed stream' if closed?
return enum_for(__method__) unless block_given?
yield retrieve_chunk until chunks_depleted?
end
# Implements IO#read semantics. Without arguments it retrieves and returns
# all content.
#
# With `length` argument returns exactly that number of bytes if they're
# available.
#
# With `outbuf` argument each call will return that same string object,
# where the value is replaced with retrieved content.
#
# If end of file is reached, returns empty string if called without
# arguments, or nil if called with arguments. Raises IOError if closed.
def read(length = nil, outbuf = nil)
raise IOError, 'closed stream' if closed?
data = outbuf.clear.force_encoding(Encoding::BINARY) if outbuf
data ||= ''.b
remaining_length = length
until remaining_length == 0 || eof?
data << readpartial_or_nil(remaining_length, buffer ||= String.new).to_s
remaining_length = length - data.bytesize if length
end
buffer.clear if buffer # deallocate string
data.force_encoding(@encoding) unless length
data unless data.empty? && length && length > 0
end
# Implements IO#gets semantics. Without arguments it retrieves lines of
# content separated by newlines.
#
# With `separator` argument it does the following:
#
# * if `separator` is a nonempty string returns chunks of content
# surrounded with that sequence of bytes
# * if `separator` is an empty string returns paragraphs of content
# (content delimited by two newlines)
# * if `separator` is nil and `limit` is nil returns all content
#
# With `limit` argument returns maximum of that amount of bytes.
#
# Returns nil if end of file is reached. Raises IOError if closed.
def gets(separator_or_limit = $/, limit = nil)
raise IOError, 'closed stream' if closed?
if separator_or_limit.is_a?(Integer)
return read(separator_or_limit)
elsif separator_or_limit.nil? && limit.is_a?(Integer)
return read(limit)
else
separator = separator_or_limit
end
separator = "\n\n" if separator.empty?
data = String.new
until data.include?(separator) || data.bytesize == limit || eof?
remaining_length = limit - data.bytesize if limit
data << readpartial(remaining_length, buffer ||= String.new)
end
buffer.clear if buffer # deallocate buffer
line, extra = data.split(separator, 2)
line << separator if data.include?(separator)
data.clear # deallocate data
if extra
if @buffer
@buffer.prepend(extra)
else
@buffer = extra
end
end
line.force_encoding(@encoding) if line
end
# Implements IO#readpartial semantics. If there is any content readily
# available reads from it, otherwise fetches and reads from the next chunk.
# It writes to and reads from the cache when needed.
#
# Without arguments it either returns all content that's readily available,
# or the next chunk. This is useful when you don't care about the size of
# chunks and you want to minimize string allocations.
#
# With `maxlen` argument returns maximum of that amount of bytes (default
# is 16KB).
#
# With `outbuf` argument each call will return that same string object,
# where the value is replaced with retrieved content.
#
# Raises EOFError if end of file is reached. Raises IOError if closed.
def readpartial(maxlen = nil, outbuf = nil)
raise IOError, 'closed stream' if closed?
maxlen ||= 16 * 1024
data = outbuf.clear.force_encoding(Encoding::BINARY) if outbuf
data ||= ''.b
return data if maxlen == 0
if @buffer.nil? && data.empty?
raise EOFError, 'end of file reached' if chunks_depleted?
@buffer = retrieve_chunk
raise EOFError, 'end of file reached' if chunks_depleted?
end
remaining_length = maxlen - data.bytesize
unless @buffer.nil? || remaining_length == 0
if remaining_length < @buffer.bytesize
buffered_data = @buffer.byteslice(0, remaining_length)
@buffer = @buffer.byteslice(remaining_length..-1)
else
buffered_data = @buffer
@buffer = nil
end
data << buffered_data
buffered_data.clear unless buffered_data.frozen?
end
@position += data.bytesize
data.force_encoding(Encoding::BINARY)
end
# Implements IO#seek semantics.
def seek(_amount, _whence = IO::SEEK_SET)
raise Errno::ESPIPE, 'Illegal seek'
end
# Implements IO#pos semantics. Returns the current position of the
# Down::ChunkedIO.
def pos
@position
end
alias tell pos
# Implements IO#eof? semantics. Returns whether we've reached end of file.
# It returns true if cache is at the end and there is no more content to
# retrieve. Raises IOError if closed.
def eof?
raise IOError, 'closed stream' if closed?
@buffer.nil? && chunks_depleted?
end
# Implements IO#rewind semantics. Rewinds the Down::ChunkedIO by rewinding
# the cache and setting the position to the beginning of the file. Raises
# IOError if closed or not rewindable.
def rewind
raise IOError, 'closed stream' if closed?
raise IOError, 'not rewindable'
end
# Implements IO#close semantics. Closes the Down::ChunkedIO by terminating
# chunk retrieval and deleting the cached content.
def close
return if @closed
# chunks_fiber.resume(:terminate) if chunks_fiber.alive?
@buffer = nil
@closed = true
end
# Returns whether the Down::ChunkedIO has been closed.
def closed?
!!@closed
end
# Returns useful information about the Down::ChunkedIO object.
def inspect
string = String.new
string << "#<#{self.class.name}"
string << " enum=#{@enum.inspect}"
string << " size=#{size.inspect}"
string << " encoding=#{encoding.inspect}"
string << ' (closed)' if closed?
string << '>'
end
private
def readpartial_or_nil(...)
readpartial(...)
rescue EOFError
nil
end
# Returns current chunk and retrieves the next chunk. If next chunk is nil,
# we know we've reached EOF.
def retrieve_chunk
@enum.next
rescue StopIteration
@chunks_depleted = true
nil
end
# Returns whether there is any content left to retrieve.
def chunks_depleted?
@chunks_depleted
end
# Finds encoding by name. If the encoding couldn't be find, falls back to
# the generic binary encoding.
def find_encoding(encoding)
Encoding.find(encoding)
rescue ArgumentError
Encoding::BINARY
end
# Returns whether the filesystem has POSIX semantics.
def posix?
RUBY_PLATFORM !~ /(mswin|mingw|cygwin|java)/
end
def empty_enum?(enum)
enum.peek
false
rescue StopIteration
true
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment