Created
May 13, 2022 12:00
-
-
Save fractaledmind/7c60bb5cf39c44a2533f765036f13f3f to your computer and use it in GitHub Desktop.
Generalized auto_paginate method to work with GitHub, Jira, and Slack APIs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "httpx" | |
module Clientable | |
extend ActiveSupport::Concern | |
def initialize | |
@httpx = HTTPX | |
.plugin(:retries, retry_change_requests: true) | |
.max_retries(3) | |
end | |
def auto_paginate(url, values_key = nil, options = {}) | |
Enumerator.new do |yielder| | |
current_point = pagination_starting_point | |
loop do | |
current_page_options = pagination_options(options, current_point) | |
response = @httpx.get(url, current_page_options) | |
response.raise_for_status | |
response_body = response.body.to_s | |
results = pagination_parse_dataset(response_body) | |
items = pagination_fetch_contents(results, values_key) | |
items.each { |item| yielder << item } | |
current_point = pagination_next_point(current_point, response, results) | |
if current_point == StopIteration | |
raise StopIteration | |
end | |
end | |
end | |
end | |
def pagination_starting_point | |
raise NotImplementedError | |
end | |
def pagination_options(options, current_point) | |
raise NotImplementedError | |
end | |
def pagination_parse_dataset(response_body, values_key) | |
raise NotImplementedError | |
end | |
def pagination_fetch_contents(results, values_key) | |
raise NotImplementedError | |
end | |
def pagination_next_point(current_point, response, results) | |
raise NotImplementedError | |
end | |
end | |
module GitHubAPI | |
class BaseClient | |
include Clientable | |
attr_reader :base_url | |
def initialize | |
super() | |
@base_url = "https://api.github.com/" | |
end | |
private | |
def pagination_starting_point | |
1 | |
end | |
def pagination_options(options, current_point) | |
options.deep_merge(params: {page: current_point}) | |
end | |
def pagination_parse_dataset(response_body) | |
JSON.parse(response_body) | |
end | |
def pagination_fetch_contents(results, _values_key) | |
results | |
end | |
def pagination_next_point(current_point, response, _results) | |
links = response.headers["Link"]&.split(", ")&.map do |link| | |
{url: link[/<(.*?)>;/, 1], type: link[/rel="(.*?)"/, 1]} | |
end || {} | |
if links.any? { |link| link[:type] == "next" } | |
current_point + 1 | |
else | |
StopIteration | |
end | |
end | |
end | |
end | |
module JiraAPI | |
class BaseClient | |
include Clientable | |
attr_reader :base_url | |
private | |
def pagination_starting_point | |
nil | |
end | |
def pagination_options(options, current_point) | |
options.deep_merge(params: {startAt: current_point}) | |
end | |
def pagination_parse_dataset(response_body) | |
JSON.parse(response_body) | |
end | |
def pagination_fetch_contents(results, values_key) | |
return results if values_key.nil? | |
results.fetch(values_key, []) | |
end | |
def pagination_next_point(_current_point, _response, results) | |
return StopIteration if results.is_a?(Array) | |
next_page_cursor = results.fetch("startAt", 0) + results.fetch("maxResults", 0) | |
if results.fetch("isLast", false) == true | |
StopIteration | |
elsif next_page_cursor >= results.fetch("total", 1) | |
StopIteration | |
else | |
next_page_cursor | |
end | |
end | |
end | |
end | |
module SlackAPI | |
class InstallationClient | |
include Clientable | |
attr_reader :base_url | |
def initialize(installation) | |
super() | |
@installation = installation | |
@base_url = "https://slack.com/api/" | |
@httpx = @httpx.plugin(:authentication) | |
.authentication("Bearer #{@installation.app_token}") | |
end | |
private | |
def pagination_starting_point | |
nil | |
end | |
def pagination_options(options, current_point) | |
options.deep_merge(params: {cursor: current_point}) | |
end | |
def pagination_parse_dataset(response_body) | |
JSON.parse(response_body) | |
end | |
def pagination_fetch_contents(results, values_key) | |
results.fetch(values_key, []) | |
end | |
def pagination_next_point(_current_point, _response, results) | |
next_page_cursor = results.dig("response_metadata", "next_cursor") | |
if next_page_cursor.blank? | |
StopIteration | |
else | |
next_page_cursor | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment