Last active
August 22, 2024 10:50
-
-
Save ismasan/8e76decb9187a3fe4426d71135ba9171 to your computer and use it in GitHub Desktop.
Turn a Ruby Enumerator into an IO-like object
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
# Wraps an enumerator that yields chunks of content into an IO object. It | |
# The IO is NOT rewindable. | |
# implements some essential IO methods: | |
# | |
# * IO#read | |
# * IO#readpartial | |
# * IO#gets | |
# * IO#size | |
# * IO#pos | |
# * IO#eof? | |
# * IO#close | |
# | |
# Adapted from https://github.com/janko/down/blob/master/lib/down/chunked_io.rb | |
# @example | |
# io = EnumeratorIO.new(["h1,h2,h3\na", ",b,c\nd,", "e,f\ng,h,i"].to_enum) | |
# rows = CSV.parse(io, headers: true) | |
# | |
class EnumeratorIO | |
attr_reader :size, :encoding | |
def initialize(enum, size: nil, encoding: nil) | |
@enum = enum | |
@size = size | |
@encoding = find_encoding(encoding || 'binary') | |
@buffer = nil | |
@position = 0 | |
@closed = false | |
@chunks_depleted = empty_enum?(enum) | |
end | |
# Yields elements of the underlying enumerator. | |
def each_chunk | |
raise IOError, 'closed stream' if closed? | |
return enum_for(__method__) unless block_given? | |
yield retrieve_chunk until chunks_depleted? | |
end | |
# Implements IO#read semantics. Without arguments it retrieves and returns | |
# all content. | |
# | |
# With `length` argument returns exactly that number of bytes if they're | |
# available. | |
# | |
# With `outbuf` argument each call will return that same string object, | |
# where the value is replaced with retrieved content. | |
# | |
# If end of file is reached, returns empty string if called without | |
# arguments, or nil if called with arguments. Raises IOError if closed. | |
def read(length = nil, outbuf = nil) | |
raise IOError, 'closed stream' if closed? | |
data = outbuf.clear.force_encoding(Encoding::BINARY) if outbuf | |
data ||= ''.b | |
remaining_length = length | |
until remaining_length == 0 || eof? | |
data << readpartial_or_nil(remaining_length, buffer ||= String.new).to_s | |
remaining_length = length - data.bytesize if length | |
end | |
buffer.clear if buffer # deallocate string | |
data.force_encoding(@encoding) unless length | |
data unless data.empty? && length && length > 0 | |
end | |
# Implements IO#gets semantics. Without arguments it retrieves lines of | |
# content separated by newlines. | |
# | |
# With `separator` argument it does the following: | |
# | |
# * if `separator` is a nonempty string returns chunks of content | |
# surrounded with that sequence of bytes | |
# * if `separator` is an empty string returns paragraphs of content | |
# (content delimited by two newlines) | |
# * if `separator` is nil and `limit` is nil returns all content | |
# | |
# With `limit` argument returns maximum of that amount of bytes. | |
# | |
# Returns nil if end of file is reached. Raises IOError if closed. | |
def gets(separator_or_limit = $/, limit = nil) | |
raise IOError, 'closed stream' if closed? | |
if separator_or_limit.is_a?(Integer) | |
return read(separator_or_limit) | |
elsif separator_or_limit.nil? && limit.is_a?(Integer) | |
return read(limit) | |
else | |
separator = separator_or_limit | |
end | |
separator = "\n\n" if separator.empty? | |
data = String.new | |
until data.include?(separator) || data.bytesize == limit || eof? | |
remaining_length = limit - data.bytesize if limit | |
data << readpartial(remaining_length, buffer ||= String.new) | |
end | |
buffer.clear if buffer # deallocate buffer | |
line, extra = data.split(separator, 2) | |
line << separator if data.include?(separator) | |
data.clear # deallocate data | |
if extra | |
if @buffer | |
@buffer.prepend(extra) | |
else | |
@buffer = extra | |
end | |
end | |
line.force_encoding(@encoding) if line | |
end | |
# Implements IO#readpartial semantics. If there is any content readily | |
# available reads from it, otherwise fetches and reads from the next chunk. | |
# It writes to and reads from the cache when needed. | |
# | |
# Without arguments it either returns all content that's readily available, | |
# or the next chunk. This is useful when you don't care about the size of | |
# chunks and you want to minimize string allocations. | |
# | |
# With `maxlen` argument returns maximum of that amount of bytes (default | |
# is 16KB). | |
# | |
# With `outbuf` argument each call will return that same string object, | |
# where the value is replaced with retrieved content. | |
# | |
# Raises EOFError if end of file is reached. Raises IOError if closed. | |
def readpartial(maxlen = nil, outbuf = nil) | |
raise IOError, 'closed stream' if closed? | |
maxlen ||= 16 * 1024 | |
data = outbuf.clear.force_encoding(Encoding::BINARY) if outbuf | |
data ||= ''.b | |
return data if maxlen == 0 | |
if @buffer.nil? && data.empty? | |
raise EOFError, 'end of file reached' if chunks_depleted? | |
@buffer = retrieve_chunk | |
raise EOFError, 'end of file reached' if chunks_depleted? | |
end | |
remaining_length = maxlen - data.bytesize | |
unless @buffer.nil? || remaining_length == 0 | |
if remaining_length < @buffer.bytesize | |
buffered_data = @buffer.byteslice(0, remaining_length) | |
@buffer = @buffer.byteslice(remaining_length..-1) | |
else | |
buffered_data = @buffer | |
@buffer = nil | |
end | |
data << buffered_data | |
buffered_data.clear unless buffered_data.frozen? | |
end | |
@position += data.bytesize | |
data.force_encoding(Encoding::BINARY) | |
end | |
# Implements IO#seek semantics. | |
def seek(_amount, _whence = IO::SEEK_SET) | |
raise Errno::ESPIPE, 'Illegal seek' | |
end | |
# Implements IO#pos semantics. Returns the current position of the | |
# Down::ChunkedIO. | |
def pos | |
@position | |
end | |
alias tell pos | |
# Implements IO#eof? semantics. Returns whether we've reached end of file. | |
# It returns true if cache is at the end and there is no more content to | |
# retrieve. Raises IOError if closed. | |
def eof? | |
raise IOError, 'closed stream' if closed? | |
@buffer.nil? && chunks_depleted? | |
end | |
# Implements IO#rewind semantics. Rewinds the Down::ChunkedIO by rewinding | |
# the cache and setting the position to the beginning of the file. Raises | |
# IOError if closed or not rewindable. | |
def rewind | |
raise IOError, 'closed stream' if closed? | |
raise IOError, 'not rewindable' | |
end | |
# Implements IO#close semantics. Closes the Down::ChunkedIO by terminating | |
# chunk retrieval and deleting the cached content. | |
def close | |
return if @closed | |
# chunks_fiber.resume(:terminate) if chunks_fiber.alive? | |
@buffer = nil | |
@closed = true | |
end | |
# Returns whether the Down::ChunkedIO has been closed. | |
def closed? | |
!!@closed | |
end | |
# Returns useful information about the Down::ChunkedIO object. | |
def inspect | |
string = String.new | |
string << "#<#{self.class.name}" | |
string << " enum=#{@enum.inspect}" | |
string << " size=#{size.inspect}" | |
string << " encoding=#{encoding.inspect}" | |
string << ' (closed)' if closed? | |
string << '>' | |
end | |
private | |
def readpartial_or_nil(...) | |
readpartial(...) | |
rescue EOFError | |
nil | |
end | |
# Returns current chunk and retrieves the next chunk. If next chunk is nil, | |
# we know we've reached EOF. | |
def retrieve_chunk | |
@enum.next | |
rescue StopIteration | |
@chunks_depleted = true | |
nil | |
end | |
# Returns whether there is any content left to retrieve. | |
def chunks_depleted? | |
@chunks_depleted | |
end | |
# Finds encoding by name. If the encoding couldn't be find, falls back to | |
# the generic binary encoding. | |
def find_encoding(encoding) | |
Encoding.find(encoding) | |
rescue ArgumentError | |
Encoding::BINARY | |
end | |
# Returns whether the filesystem has POSIX semantics. | |
def posix? | |
RUBY_PLATFORM !~ /(mswin|mingw|cygwin|java)/ | |
end | |
def empty_enum?(enum) | |
enum.peek | |
false | |
rescue StopIteration | |
true | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment