Created
January 4, 2024 15:46
-
-
Save mackuba/c33c9b623590cf5a713dd86f6459ae58 to your computer and use it in GitHub Desktop.
Downloading bsky firehose events to a file and streaming them from a mock server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'skyfall' | |
AVG_EVENTS_PER_SEC = 50 | |
desc "Download a part of the firehose cache to a file" | |
task :fetch do | |
current_head = nil | |
sky = Skyfall::Stream.new(ENV['FIREHOSE'] || 'bsky.network', :subscribe_repos) | |
sky.on_message do |m| | |
current_head = m.seq | |
sky.disconnect | |
end | |
sky.connect | |
if ENV['HOURS'].to_s != '' | |
cursor = current_head - (AVG_EVENTS_PER_SEC * 3600 * ENV['HOURS'].to_f).to_i | |
else | |
cursor = 100000000 | |
end | |
start = nil | |
seq = nil | |
messages = [] | |
sky = Skyfall::Stream.new(ENV['FIREHOSE'] || 'bsky.network', :subscribe_repos, cursor) | |
sky.on_error { |e| puts e } | |
sky.on_raw_message do |m| | |
if !start | |
atp_message = Skyfall::WebsocketMessage.new(m) | |
puts atp_message.time.getlocal | |
start = atp_message.seq | |
seq = start | |
else | |
seq += 1 | |
end | |
print "Fetching messages [#{seq - start}/#{current_head - start}]\r" | |
messages << m | |
if seq == current_head | |
sky.disconnect | |
end | |
end | |
sky.connect | |
File.write(ENV['FILE'] || 'firehose.bin', Marshal.dump(messages)) | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'bundler/setup' | |
require 'faye/websocket' | |
require 'rack' | |
filename = ARGV[0] | |
if filename.to_s.empty? | |
puts "Usage: #{$PROGRAM_NAME} <filename>" | |
exit 1 | |
end | |
messages = Marshal.load(File.read(filename)) | |
App = lambda do |env| | |
if Faye::WebSocket.websocket?(env) | |
ws = Faye::WebSocket.new(env) | |
ws.on :open do | |
puts "Opened connection" | |
messages.each do |msg| | |
ws.send(msg.bytes) | |
end | |
# TODO: don't close before the client finishes processing events in the buffer | |
ws.close | |
end | |
ws.on :close do | |
puts "Closed connection" | |
ws = nil | |
end | |
ws.rack_response | |
else | |
puts "Bad request" | |
[400, { 'Content-Type' => 'text/plain' }, ['Bad Request']] | |
end | |
end | |
Faye::WebSocket.load_adapter('thin') | |
Rack::Handler.default.run(App) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment