Last active
September 3, 2019 17:50
-
-
Save nz/adf2c84e0e8bb958e2bf4a6b65eb4b1a to your computer and use it in GitHub Desktop.
Dynamic time-based batch sizing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
elasticsearch_url = ENV.fetch('ELASTICSEARCH_URL', 'http://localhost:9200') | |
elasticsearch = Elasticsearch::Client.new(url: elasticsearch_url, trace: true) | |
importer = Importer.new | |
importer.batch_handler = lambda do |actions| | |
elasticsearch.bulk(body: actions) | |
end | |
importer.start | |
csv = CSV.new(File.open('data/books.csv', 'r'), headers: true) | |
csv.each do |row| | |
importer << { | |
index: { | |
_index: 'books', | |
_type: 'doc', | |
data: row.to_hash | |
} | |
} | |
end | |
importer.finish |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Dynamically import batches of objects | |
class Importer | |
# action for processing a single batch of actions | |
attr_accessor :batch_handler | |
# batch_range - start at the bottom and max out at the top | |
# duration_range - when below this range, increase the batch; when above, decrease | |
# ratio - the expansion/contraction ratio relative to the minimum batch size | |
# ratio_factor - the amount of expansion, dynamically updated in response to the timing | |
def initialize(batch_range = (1_000..10_000), duration_range = (0.5..1.0), ratio = 1.6) | |
@batch_range = batch_range | |
@duration_range = duration_range | |
@ratio = 1.6 | |
@ratio_factor = 1 | |
@queue = Queue.new | |
end | |
def capped_ratio_factor | |
@ratio_factor > 1 ? @ratio_factor : 1 | |
end | |
def current_batch_size | |
size = @batch_range.first * @ratio**capped_ratio_factor | |
size < @batch_range.last ? size : batch_range.last | |
end | |
# Accumulate | |
def <<(thing) | |
@queue << thing | |
end | |
def finish | |
@queue << nil | |
@worker_thread.join | |
end | |
def collect_batch | |
thing = nil | |
things = [] | |
loop do | |
thing = @queue.pop | |
break if thing.nil? # signal termination | |
things << thing | |
break if things.length >= current_batch_size | |
end | |
[thing, things] # include the last returned thing as a sentinel | |
end | |
def start | |
# TODO raise if batch_handler is not defined | |
@worker_thread = Thread.new do | |
loop do | |
sentinel, things = collect_batch | |
break if sentinel.nil? && things.empty? # nothing to do | |
# send to the worker action | |
t0 = Time.now | |
@batch_handler.call(things) | |
# adjust the batch size based on duration | |
if things.length == current_batch_size | |
@ratio_factor += 1 if Time.now - t0 < @duration_range.first | |
@ratio_factor -= 1 if Time.now - t0 > @duration_range.last | |
end | |
break if sentinel.nil? | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment