Created
February 25, 2015 17:33
-
-
Save billdueber/4e35e167e9bb9b5ad3a7 to your computer and use it in GitHub Desktop.
Threaded MARC reads
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'concurrent' | |
require 'thread' | |
require 'json' | |
require 'stringio' | |
require 'marc' | |
require 'benchmark' | |
# OK. Try to read in a line, and then have another thread transform it, and expose | |
# via #each. | |
# | |
# | |
# On my 4-core machine, after warmup, you get about a 40% speedup for binary marc (forgiving) | |
# and 60% for njd/marc-in-json | |
# | |
# jruby 1.7.18 (1.9.3p551) 2014-12-22 625381c on Java HotSpot(TM) 64-Bit Server VM 1.8.0-b132 +jit [darwin-x86_64] | |
# | |
# | |
# Rehearsal ---------------------------------------------------------- | |
# serial bin forgiving 101.990000 0.960000 102.950000 ( 54.077000) | |
# threaded bin forgiving 162.660000 2.680000 165.340000 ( 47.076000) | |
# serial ndj 42.710000 0.480000 43.190000 ( 37.470000) | |
# threaded ndj 76.130000 2.410000 78.540000 ( 23.115000) | |
# ----------------------------------------------- total: 390.020000sec | |
# | |
# user system total real | |
# serial bin forgiving 118.740000 0.560000 119.300000 ( 59.470000) | |
# threaded bin forgiving 146.330000 2.270000 148.600000 ( 41.783000) | |
# serial ndj 39.850000 0.440000 40.290000 ( 36.368000) | |
# threaded ndj 74.520000 2.280000 76.800000 ( 22.400000) | |
# ruby 2.2.0p0 (2014-12-25 revision 49005) [x86_64-darwin13] | |
# | |
# | |
# Rehearsal ---------------------------------------------------------- | |
# serial bin forgiving 64.730000 0.280000 65.010000 ( 65.173811) | |
# threaded bin forgiving 87.780000 4.670000 92.450000 ( 90.267687) | |
# serial ndj 70.950000 1.200000 72.150000 ( 72.429602) | |
# threaded ndj 96.010000 5.600000 101.610000 ( 98.965390) | |
# ----------------------------------------------- total: 331.220000sec | |
require 'concurrent' | |
require 'thread' | |
class ThreadedReader | |
include Enumerable | |
include Concurrent::Async | |
def initialize(eachable) | |
init_mutex | |
@eachable = eachable | |
@cache = SizedQueue.new(4) | |
end | |
def each_raw_record | |
@eachable.each {|rr| yield rr } | |
end | |
def process_raw_record(rr) | |
JSON.parse(rr) | |
end | |
def fill_queue | |
each_raw_record do |rr| | |
@cache << Concurrent::Promise.new {rr}.then{|x| process_raw_record(x)}.execute | |
end | |
@cache << :end_of_records | |
end | |
def each | |
self.async.fill_queue | |
loop do | |
r = @cache.pop | |
break if r == :end_of_records | |
yield r.value | |
end | |
end | |
end | |
class ThreadedMIJReader < ThreadedReader | |
def each_raw_record | |
@eachable.each do |rr| | |
rr.chomp! | |
yield rr | |
end | |
end | |
def process_raw_record(rr) | |
MARC::Record.new_from_hash(JSON.parse(rr)) | |
end | |
end | |
class ThreadedMarcFogivingReader < ThreadedReader | |
def each_raw_record | |
@eachable.each_line(MARC::END_OF_RECORD) {|rr| yield rr} | |
end | |
def process_raw_record(rr) | |
MARC::Reader.decode(rr, :forgiving=>true) | |
end | |
end | |
ndjfilename= '150k_marc.ndj' | |
binfilename = '150k.mrc' | |
puts RUBY_DESCRIPTION | |
Benchmark.bmbm do |x| | |
x.report('serial bin forgiving') do | |
titles = [] | |
MARC::ForgivingReader.new(binfilename, :permissive=>true).each do |r| | |
titles << r['245'] | |
end | |
end | |
x.report('threaded bin forgiving') do | |
titles = [] | |
File.open(binfilename, 'r:binary') do |mrcfile| | |
ThreadedMarcFogivingReader.new(mrcfile).each do |r| | |
titles << r['245'] | |
end | |
end | |
end | |
puts "\n" | |
x.report('serial ndj') do | |
titles = [] | |
File.open(ndjfilename, 'r:utf-8').each do |rr| | |
rr.chomp! | |
titles << MARC::Record.new_from_hash(JSON.parse(rr))['245'] | |
end | |
end | |
x.report('threaded ndj') do | |
titles = [] | |
ThreadedMIJReader.new(File.open(ndjfilename, 'r:utf-8')).each do |r| | |
titles << r['245'] | |
end | |
end | |
puts "\n" | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment