Skip to content

Instantly share code, notes, and snippets.

@billdueber
Last active January 13, 2022 17:18
Show Gist options
  • Save billdueber/3f83740029600b5fc5fbfc020f30a65f to your computer and use it in GitHub Desktop.
Save billdueber/3f83740029600b5fc5fbfc020f30a65f to your computer and use it in GitHub Desktop.
Simple RSolr extension to use a cursor-based stream to iterate over docs in a solr core
# frozen_string_literal: true
# # Simple example -- get ids and titles of all items without an author
#
# rsolr = RSolr.connect(url: 'http://localhost:8025/solr/catalog')
# stream = rsolr.streamer(handler: 'select') do |s|
# s.filter = 'NOT author:[* TO *]'
# s.sort = 'id asc'
# s.fields = ['id', 'title']
# s.batch_size = 2_000
# end
# File.open("no_author.tsv", "w:utf-8") do |outfile|
# stream.each do |doc|
# outfile.puts [doc['id'], doc['title']].join("\t")
# end
# end
require "rsolr"
require "delegate"
module RSolr
class Client
def streamer(**kwargs)
Streamer.new(rsolr: self, **kwargs)
end
end
# Cursor-based streamer to iterate through non-ranked documents, possibly filtered, from a solr core without the usual
# problems associated with deep paging. Note that this can't efficiently get you straight to a deep page, it
# only has benefits when pulling a long set of documents from solr.
class Streamer
include Enumerable
attr_accessor :handler, :filter, :sort, :batch_size, :rsolr, :fields
# @param [RSolr] rsolr an underlying rsolr object pointing to a core, created however you want
# @param [String] handler The handler to target
# @param [String] filter The filter to apply before streaming.
# @param [Sort] sort A solr sort; must on a unique index
# @param [Integer] batch_size How many documents to pull from solr at onces when streaming
# @param [Array<String>] fields Which fields to return with each document. Default is "all"
# @yieldreturn [RSolr::Streamer] the new object
def initialize(rsolr:, handler: "select", filter: "*:*", sort: "id asc", batch_size: 1000, fields: [])
@rsolr = rsolr
@handler = handler
@filter = filter
@batch_size = batch_size
@sort = sort
@last_cursor = nil
@current_cursor = "*"
@fields = fields
yield self if block_given?
end
# Iterate through the documents in the stream. Behind the scenes, these will be fetched in batches
# of `batch_size` for efficiency.
# @yieldreturn [Hash] A single solr document in the stream
def each
return enum_for(:each) unless block_given?
verify_we_have_everything!
while solr_has_more?
cursor_response = get_page
cursor_response.docs.each { |d| yield d }
end
end
# @private
# @return [Hash] Default solr params derived from instance variables
def default_params
fl = Array(fields).join(",")
p = {q: "*:*", wt: :ruby, rows: @batch_size, sort: @sort, fq: @filter, fl: fl}
p.reject { |k, v| [nil, "", []].include?(v) }
end
# @private
# Make sure we have everything we need for a successful stream
def verify_we_have_everything!
missing = {rsolr: rsolr, handler: handler, filter: filter, batch_size: batch_size}.select { |_k, v| v.nil? }.keys
raise "RSolr::Streamer missing value for #{missing.join(", ")}" unless missing.empty?
end
# @private
# Get a single "page" (`batch_size` documents) from solr. Feeds into #each
# @return [CursorResponse]
def get_page
params = {cursorMark: @current_cursor, fl: Array(fields).join(",")}.merge default_params
resp = CursorResponse.new(@rsolr.get(@handler, params: params))
@last_cursor = @current_cursor
@current_cursor = resp.cursor
resp
end
# @private
# Determine if solr has another page of results
# @return [Boolean]
def solr_has_more?
@last_cursor != @current_cursor
end
end
# Utility wrapper around rsolr response
class CursorResponse < SimpleDelegator
def initialize(rsolr_response)
super
@resp = rsolr_response
__setobj__(@resp)
end
def docs
@resp["response"]["docs"]
end
def num_found
@resp["response"]["numFound"]
end
def cursor
@resp["nextCursorMark"]
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment