Last active
June 21, 2024 17:36
-
-
Save jonatas/418f360d45c890e1d86c30547a0cf6a4 to your computer and use it in GitHub Desktop.
POC Make RubyGems track downloads with TimescaleDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bundler/inline' | |
gemfile do | |
source 'https://rubygems.org' | |
gem 'timescaledb' | |
gem 'bulk_insert' | |
gem 'pry' | |
end | |
require 'timescaledb' | |
class Download < ActiveRecord::Base | |
acts_as_hypertable time_column: 'ts' | |
scope :time_bucket, -> (range='1m', query="count(*)") do | |
select("time_bucket('#{range}', #{time_column}) as #{time_column}, #{query}") | |
end | |
scope :per_minute, -> (query="count(*) as downloads") do | |
time_bucket('1m', query).group(1) | |
end | |
scope :stats_agg_per_minute, -> do | |
per_minute("gem_name, gem_version, stats_agg(cast(1.0 as double precision)) as stats_agg") | |
.group(1,2,3) | |
end | |
scope :gems_per_minute, -> do | |
per_minute("gem_name, count(*) as downloads").group(1,2) | |
end | |
scope :versions_per_minute, -> do | |
per_minute("gem_name, gem_version, count(*) as downloads").group(1,2,3) | |
end | |
cagg = -> (view_name) do | |
Class.new(ActiveRecord::Base) do | |
self.table_name = "downloads_#{view_name}" | |
QUERIES = { | |
sum_downloads: "sum(downloads)::bigint as downloads", | |
avg_downloads: "avg(downloads)::bigint as avg_downloads", | |
rollup: "rollup(stats_agg) as stats_agg", | |
rolling_downloads: "rolling(stats_agg)->sum() as downloads", | |
rollup_downloads: "rollup(stats_agg)->sum() as downloads", | |
rollup_downloads_per_gem: "gem_name,rollup(stats_agg)->sum() as downloads", | |
rollup_downloads_per_version: "gem_name, gem_version, rollup(stats_agg)->sum() as downloads" | |
} | |
scope :rollup, -> (range='1d', query=:sum_downloads) do | |
select("time_bucket('#{range}', ts) as ts, #{QUERIES[query] || query}") | |
.group(1) | |
end | |
scope :per_hour, -> (query=:sum_downloads) do | |
rollup('1h', query) | |
end | |
scope :per_day, -> (query=:sum_downloads) do | |
rollup('1d', query) | |
end | |
scope :per_week, -> (query=:sum_downloads) do | |
rollup('1w', query) | |
end | |
scope :per_month, -> (query=:sum_downloads) do | |
rollup('1mon', query) | |
end | |
scope :per_year, -> (query=:sum_downloads) do | |
rollup('1y', query) | |
end | |
def readonly? | |
true | |
end | |
def self.refresh! | |
ActiveRecord::Base.connection.execute <<-SQL | |
CALL refresh_continuous_aggregate('#{table_name}', null, null); | |
SQL | |
end | |
end | |
end | |
PerMinute = cagg['per_minute'] | |
PerHour= cagg['per_hour'] | |
PerDay = cagg['per_day'] | |
PerMonth = cagg['per_month'] | |
GemsPerMinute = cagg['gems_per_minute'] | |
GemsPerHour= cagg['gems_per_hour'] | |
GemsPerDay = cagg['gems_per_day'] | |
GemsPerMonth= cagg['gems_per_month'] | |
VersionsPerMinute= cagg['versions_per_minute'] | |
VersionsPerHour = cagg['versions_per_hour'] | |
VersionsPerDay = cagg['versions_per_day'] | |
VersionsPerMonth = cagg['versions_per_month'] | |
StatsAggPerMinute = cagg['stats_agg_per_minute'] | |
StatsAggPerHour = cagg['stats_agg_per_hour'] | |
StatsAggPerDay = cagg['stats_agg_per_day'] | |
StatsAggPerMonth = cagg['stats_agg_per_month'] | |
end | |
# Connect to the database | |
ActiveRecord::Base.establish_connection(ENV['DATABASE_URL']) | |
ActiveRecord::Base.connection.instance_exec do | |
ActiveRecord::Base.logger = Logger.new(STDOUT) | |
%w[day hour minute].each do |frame| | |
["downloads_per_#{frame}", | |
"downloads_gems_per_#{frame}", | |
"downloads_versions_per_#{frame}", | |
"downloads_stats_agg_per_#{frame}" | |
].each do |view| | |
execute("DROP MATERIALIZED VIEW IF EXISTS #{view} cascade") | |
end | |
end | |
drop_table(:downloads, force: :cascade) if Download.table_exists? | |
hypertable_options = { | |
time_column: 'ts', | |
chunk_time_interval: '1 day', | |
compress_segmentby: 'gem_name, gem_version', | |
compress_orderby: 'ts DESC', | |
compression_interval: '7 days' | |
} | |
create_table(:downloads, id: false, hypertable: hypertable_options) do |t| | |
t.timestamptz :ts, null: false | |
t.text :gem_name, :gem_version, null: false | |
t.jsonb :payload | |
end | |
{ | |
per_minute: Download.per_minute, | |
per_hour: Download::PerMinute.per_hour(:sum_downloads).group(1), | |
per_day: Download::PerHour.per_day(:sum_downloads).group(1), | |
per_month: Download::PerDay.per_month(:sum_downloads).group(1), | |
gems_per_minute: Download.gems_per_minute, | |
gems_per_hour: Download::GemsPerMinute.per_hour("gem_name, count(*) as downloads").group(1,2), | |
gems_per_day: Download::GemsPerHour.per_day("gem_name, count(*) as downloads").group(1,2), | |
gems_per_month: Download::GemsPerDay.per_month("gem_name, count(*) as downloads").group(1,2), | |
versions_per_minute: Download.versions_per_minute, | |
versions_per_hour: Download::VersionsPerMinute.per_hour("gem_name, gem_version, count(*) as downloads").group(1,2,3), | |
versions_per_day: Download::VersionsPerHour.per_day("gem_name, gem_version, count(*) as downloads").group(1,2,3), | |
versions_per_month: Download::VersionsPerDay.per_month("gem_name, gem_version, count(*) as downloads").group(1,2,3), | |
stats_agg_per_minute: Download.stats_agg_per_minute, | |
stats_agg_per_hour: Download::StatsAggPerMinute.per_hour(:rollup).group(1), | |
stats_agg_per_day: Download::StatsAggPerHour.per_day(:rollup).group(1), | |
stats_agg_per_month: Download::StatsAggPerDay.per_month(:rollup).group(1) | |
}.each do |name, scope| | |
puts "Creating continuous aggregate #{name}", scope.to_sql | |
frame = name.to_s.split('per_').last | |
create_continuous_aggregate( | |
"downloads_#{name}", | |
scope.to_sql, | |
refresh_policies: { | |
schedule_interval: "INTERVAL '1 #{frame}'", | |
start_offset: "INTERVAL '3 #{frame}'", | |
end_offset: "INTERVAL '1 minute'" | |
}) | |
end | |
end | |
ActiveRecord::Base.logger = nil | |
PATH_PATTERN = /\/gems\/(?<gem_name>.*)-(?<gem_version>\d+.*)\.gem/ | |
def parse_file(file) | |
downloads = [] | |
File.open(file).each_line do |log_line| | |
fragments = log_line.split | |
path, response_code = fragments[10, 2] | |
case response_code.to_i | |
# Only count successful downloads | |
# NB: we consider a 304 response a download attempt | |
when 200, 304 | |
m = path.match(PATH_PATTERN) | |
gem_name = m[:gem_name] || path | |
gem_version = m[:gem_version] | |
ip = fragments[3] | |
ts = Time.parse fragments[4..9].join(' ') | |
env = parse_env fragments[12..-1] | |
payload = {ip:, env:} | |
downloads << {ts:, gem_name:, gem_version:, payload:} | |
if downloads.size == 5000 | |
insert_downloads(downloads) | |
downloads.clear | |
end | |
end | |
end | |
if downloads.any? | |
insert_downloads(downloads) | |
end | |
end | |
# example env = "bundler/2.5.9 rubygems/3.3.25 ruby/3.1.0" | |
# output = {bundler: "2.5.9", rubygems: "3.3.25", ruby: "3.1.0"} | |
# case it says single word like jruby it appends true as the value | |
# example env = "jruby" | |
# output = {jruby: "true"} | |
def parse_env(output) | |
env = output.join(' ').gsub(/command.*|\(.*\)|Ruby, /,'').strip | |
env = nil if env == "(null)" | |
env = env.split(' ').map do |info| | |
pair = info.split(/\/|-/,2) | |
pair << "true" if pair.size == 1 | |
pair | |
end.to_h | |
end | |
def insert_downloads(downloads) | |
Download.bulk_insert values: downloads | |
end | |
s3_files = [ | |
"2024-04-26T00_15_00.000-szTpTn9sP1-116Ajwl4N.log" | |
] | |
Benchmark.bm do |x| | |
s3_files.each do |file| | |
x.report "parse and load #{file}" do | |
parse_file(file) | |
end | |
%w[ | |
PerMinute GemsPerMinute VersionsPerMinute StatsAggPerMinute | |
PerHour GemsPerHour VersionsPerHour StatsAggPerHour | |
PerDay GemsPerDay VersionsPerDay StatsAggPerDay | |
PerMonth GemsPerMonth VersionsPerMonth StatsAggPerMonth | |
].each do |view| | |
x.report "Refresh #{view}" do | |
Download.const_get(view).refresh! | |
end | |
end | |
end | |
end | |
require "pry";binding.pry | |
=begin | |
Download::PerHour.first | |
Download::GemsPerHour.all | |
Download::VersionsPerHour.where(gem_name: "rails").pluck(:gem_version, :downloads) # => [["1.2.3.4", 6], ["6.1.7", 1], ["7.0.2", 1]] | |
=end | |
Exploring stats_agg with grouping by gem name:
Download::StatsAggPerMinute
.select("ts, gem_name, rollup(stats_agg)->sum() as downloads")
.group(1,2).map(&:attributes)
# [{"ts"=>2024-04-26 00:11:00 UTC, "gem_name"=>"rack-session", "downloads"=>3.0},
# {"ts"=>2024-04-26 00:11:00 UTC, "gem_name"=>"aws-sdk-organizations", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:11:00 UTC, "gem_name"=>"aws-sdk-comprehendmedical", "downloads"=>2.0},
# {"ts"=>2024-04-26 00:10:00 UTC, "gem_name"=>"net-pop", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:10:00 UTC, "gem_name"=>"mustermann-grape", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:11:00 UTC, "gem_name"=>"httpclient", "downloads"=>5.0}, ...]
By gem_name and version:
Download::StatsAggPerMinute
.select("ts, rollup(stats_agg)->sum() as downloads")
.group(1).map(&:attributes)
# => [{"ts"=>2024-04-26 00:12:00 UTC, "downloads"=>1461.0},
# {"ts"=>2024-04-26 00:14:00 UTC, "downloads"=>1127.0},
# {"ts"=>2024-04-26 00:13:00 UTC, "downloads"=>1150.0},
# {"ts"=>2024-04-26 00:15:00 UTC, "downloads"=>1005.0},
# {"ts"=>2024-04-26 00:11:00 UTC, "downloads"=>1322.0},
# {"ts"=>2024-04-26 00:10:00 UTC, "downloads"=>110.0}]
Unfolding one gem by version:
Download::StatsAggPerMinute
.where(gem_name: "rails")
.select("ts, gem_version, rollup(stats_agg)->sum() as downloads")
.group(1,2).map(&:attributes)
# => [{"ts"=>2024-04-26 00:14:00 UTC, "gem_version"=>"7.0.2", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:14:00 UTC, "gem_version"=>"6.1.7", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:11:00 UTC, "gem_version"=>"1.2.3.4", "downloads"=>2.0},
# {"ts"=>2024-04-26 00:15:00 UTC, "gem_version"=>"1.2.3.4", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:14:00 UTC, "gem_version"=>"1.2.3.4", "downloads"=>2.0},
# {"ts"=>2024-04-26 00:12:00 UTC, "gem_version"=>"1.2.3.4", "downloads"=>1.0}]
Rolling per_hour:
Download::StatsAggPerMinute
.where(gem_name: "rails")
.per_hour("gem_version, rollup(stats_agg)->sum() as downloads")
.group(1,2).map(&:attributes)
# => [{"ts"=>2024-04-26 00:00:00 UTC, "gem_version"=>"1.2.3.4", "downloads"=>6.0},
# {"ts"=>2024-04-26 00:00:00 UTC, "gem_version"=>"6.1.7", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:00:00 UTC, "gem_version"=>"7.0.2", "downloads"=>1.0}]
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Cross linking the video walkthrough.
Head of the log output: