Last active
January 13, 2022 17:18
-
-
Save billdueber/3f83740029600b5fc5fbfc020f30a65f to your computer and use it in GitHub Desktop.
Simple RSolr extension to use a cursor-based stream to iterate over docs in a solr core
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
# # Simple example -- get ids and titles of all items without an author | |
# | |
# rsolr = RSolr.connect(url: 'http://localhost:8025/solr/catalog') | |
# stream = rsolr.streamer(handler: 'select') do |s| | |
# s.filter = 'NOT author:[* TO *]' | |
# s.sort = 'id asc' | |
# s.fields = ['id', 'title'] | |
# s.batch_size = 2_000 | |
# end | |
# File.open("no_author.tsv", "w:utf-8") do |outfile| | |
# stream.each do |doc| | |
# outfile.puts [doc['id'], doc['title']].join("\t") | |
# end | |
# end | |
require "rsolr" | |
require "delegate" | |
module RSolr | |
class Client | |
def streamer(**kwargs) | |
Streamer.new(rsolr: self, **kwargs) | |
end | |
end | |
# Cursor-based streamer to iterate through non-ranked documents, possibly filtered, from a solr core without the usual | |
# problems associated with deep paging. Note that this can't efficiently get you straight to a deep page, it | |
# only has benefits when pulling a long set of documents from solr. | |
class Streamer | |
include Enumerable | |
attr_accessor :handler, :filter, :sort, :batch_size, :rsolr, :fields | |
# @param [RSolr] rsolr an underlying rsolr object pointing to a core, created however you want | |
# @param [String] handler The handler to target | |
# @param [String] filter The filter to apply before streaming. | |
# @param [Sort] sort A solr sort; must on a unique index | |
# @param [Integer] batch_size How many documents to pull from solr at onces when streaming | |
# @param [Array<String>] fields Which fields to return with each document. Default is "all" | |
# @yieldreturn [RSolr::Streamer] the new object | |
def initialize(rsolr:, handler: "select", filter: "*:*", sort: "id asc", batch_size: 1000, fields: []) | |
@rsolr = rsolr | |
@handler = handler | |
@filter = filter | |
@batch_size = batch_size | |
@sort = sort | |
@last_cursor = nil | |
@current_cursor = "*" | |
@fields = fields | |
yield self if block_given? | |
end | |
# Iterate through the documents in the stream. Behind the scenes, these will be fetched in batches | |
# of `batch_size` for efficiency. | |
# @yieldreturn [Hash] A single solr document in the stream | |
def each | |
return enum_for(:each) unless block_given? | |
verify_we_have_everything! | |
while solr_has_more? | |
cursor_response = get_page | |
cursor_response.docs.each { |d| yield d } | |
end | |
end | |
# @private | |
# @return [Hash] Default solr params derived from instance variables | |
def default_params | |
fl = Array(fields).join(",") | |
p = {q: "*:*", wt: :ruby, rows: @batch_size, sort: @sort, fq: @filter, fl: fl} | |
p.reject { |k, v| [nil, "", []].include?(v) } | |
end | |
# @private | |
# Make sure we have everything we need for a successful stream | |
def verify_we_have_everything! | |
missing = {rsolr: rsolr, handler: handler, filter: filter, batch_size: batch_size}.select { |_k, v| v.nil? }.keys | |
raise "RSolr::Streamer missing value for #{missing.join(", ")}" unless missing.empty? | |
end | |
# @private | |
# Get a single "page" (`batch_size` documents) from solr. Feeds into #each | |
# @return [CursorResponse] | |
def get_page | |
params = {cursorMark: @current_cursor, fl: Array(fields).join(",")}.merge default_params | |
resp = CursorResponse.new(@rsolr.get(@handler, params: params)) | |
@last_cursor = @current_cursor | |
@current_cursor = resp.cursor | |
resp | |
end | |
# @private | |
# Determine if solr has another page of results | |
# @return [Boolean] | |
def solr_has_more? | |
@last_cursor != @current_cursor | |
end | |
end | |
# Utility wrapper around rsolr response | |
class CursorResponse < SimpleDelegator | |
def initialize(rsolr_response) | |
super | |
@resp = rsolr_response | |
__setobj__(@resp) | |
end | |
def docs | |
@resp["response"]["docs"] | |
end | |
def num_found | |
@resp["response"]["numFound"] | |
end | |
def cursor | |
@resp["nextCursorMark"] | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment