Skip to content

Instantly share code, notes, and snippets.

@billdueber
Created February 25, 2015 17:33
Show Gist options
  • Save billdueber/4e35e167e9bb9b5ad3a7 to your computer and use it in GitHub Desktop.
Save billdueber/4e35e167e9bb9b5ad3a7 to your computer and use it in GitHub Desktop.
Threaded MARC reads
require 'concurrent'
require 'thread'
require 'json'
require 'stringio'
require 'marc'
require 'benchmark'
# OK. Try to read in a line, and then have another thread transform it, and expose
# via #each.
#
#
# On my 4-core machine, after warmup, you get about a 40% speedup for binary marc (forgiving)
# and 60% for njd/marc-in-json
#
# jruby 1.7.18 (1.9.3p551) 2014-12-22 625381c on Java HotSpot(TM) 64-Bit Server VM 1.8.0-b132 +jit [darwin-x86_64]
#
#
# Rehearsal ----------------------------------------------------------
# serial bin forgiving 101.990000 0.960000 102.950000 ( 54.077000)
# threaded bin forgiving 162.660000 2.680000 165.340000 ( 47.076000)
# serial ndj 42.710000 0.480000 43.190000 ( 37.470000)
# threaded ndj 76.130000 2.410000 78.540000 ( 23.115000)
# ----------------------------------------------- total: 390.020000sec
#
# user system total real
# serial bin forgiving 118.740000 0.560000 119.300000 ( 59.470000)
# threaded bin forgiving 146.330000 2.270000 148.600000 ( 41.783000)
# serial ndj 39.850000 0.440000 40.290000 ( 36.368000)
# threaded ndj 74.520000 2.280000 76.800000 ( 22.400000)
# ruby 2.2.0p0 (2014-12-25 revision 49005) [x86_64-darwin13]
#
#
# Rehearsal ----------------------------------------------------------
# serial bin forgiving 64.730000 0.280000 65.010000 ( 65.173811)
# threaded bin forgiving 87.780000 4.670000 92.450000 ( 90.267687)
# serial ndj 70.950000 1.200000 72.150000 ( 72.429602)
# threaded ndj 96.010000 5.600000 101.610000 ( 98.965390)
# ----------------------------------------------- total: 331.220000sec
require 'concurrent'
require 'thread'
class ThreadedReader
include Enumerable
include Concurrent::Async
def initialize(eachable)
init_mutex
@eachable = eachable
@cache = SizedQueue.new(4)
end
def each_raw_record
@eachable.each {|rr| yield rr }
end
def process_raw_record(rr)
JSON.parse(rr)
end
def fill_queue
each_raw_record do |rr|
@cache << Concurrent::Promise.new {rr}.then{|x| process_raw_record(x)}.execute
end
@cache << :end_of_records
end
def each
self.async.fill_queue
loop do
r = @cache.pop
break if r == :end_of_records
yield r.value
end
end
end
class ThreadedMIJReader < ThreadedReader
def each_raw_record
@eachable.each do |rr|
rr.chomp!
yield rr
end
end
def process_raw_record(rr)
MARC::Record.new_from_hash(JSON.parse(rr))
end
end
class ThreadedMarcFogivingReader < ThreadedReader
def each_raw_record
@eachable.each_line(MARC::END_OF_RECORD) {|rr| yield rr}
end
def process_raw_record(rr)
MARC::Reader.decode(rr, :forgiving=>true)
end
end
ndjfilename= '150k_marc.ndj'
binfilename = '150k.mrc'
puts RUBY_DESCRIPTION
Benchmark.bmbm do |x|
x.report('serial bin forgiving') do
titles = []
MARC::ForgivingReader.new(binfilename, :permissive=>true).each do |r|
titles << r['245']
end
end
x.report('threaded bin forgiving') do
titles = []
File.open(binfilename, 'r:binary') do |mrcfile|
ThreadedMarcFogivingReader.new(mrcfile).each do |r|
titles << r['245']
end
end
end
puts "\n"
x.report('serial ndj') do
titles = []
File.open(ndjfilename, 'r:utf-8').each do |rr|
rr.chomp!
titles << MARC::Record.new_from_hash(JSON.parse(rr))['245']
end
end
x.report('threaded ndj') do
titles = []
ThreadedMIJReader.new(File.open(ndjfilename, 'r:utf-8')).each do |r|
titles << r['245']
end
end
puts "\n"
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment