Skip to content

Instantly share code, notes, and snippets.

@jsvd
Last active January 20, 2020 16:34
Show Gist options
  • Save jsvd/4315e10241049ff0e55898a2f3797542 to your computer and use it in GitHub Desktop.
Save jsvd/4315e10241049ff0e55898a2f3797542 to your computer and use it in GitHub Desktop.
# encoding: utf-8
# disclaimer: this code was ripped off from Jordan Sissel's [lumberjack client](https://github.com/elastic/ruby-lumberjack/blob/master/lib/lumberjack/client.rb)
require "socket"
require "zlib"
require "json"
module Lumberjack
SEQUENCE_MAX = (2**32-1).freeze
class Socket
attr_reader :sequence
attr_reader :host
attr_reader :last_ack
def initialize(host, port)
@sequence = 0
@last_ack = 0
connection_start(host, port)
end
private
def connection_start(host, port)
@socket = TCPSocket.new(host, port)
end
private
def inc
@sequence = 0 if @sequence + 1 > Lumberjack::SEQUENCE_MAX
@sequence = @sequence + 1
end
private
def send_window_size(size)
@socket.syswrite(["2", "W", size].pack("AAN"))
end
private
def send_payload(payload)
bytes_written = 0
while bytes_written < payload.bytesize
bytes_written += @socket.syswrite(payload.byteslice(bytes_written..-1))
end
end
public
def write_sync(elements, opts={})
elements = [elements] if elements.is_a?(Hash)
send_window_size(elements.size)
payload = elements.map { |element| JsonEncoder.to_frame(element, inc) }.join
compress = compress_payload(payload)
send_payload(compress)
ack(elements.size)
end
private
def compress_payload(payload)
compress = Zlib::Deflate.deflate(payload)
["2", "C", compress.bytesize, compress].pack("AANA*")
end
private
def ack(size)
_, type = read_version_and_type
raise "Whoa we shouldn't get this frame: #{type}" if type != "A"
@last_ack = read_last_ack
end
private
def unacked_sequence_size
sequence - (@last_ack + 1)
end
private
def read_version_and_type
version = @socket.read(1)
type = @socket.read(1)
[version, type]
end
private
def read_last_ack
@socket.read(4).unpack("N").first
end
end
module JsonEncoder
def self.to_frame(hash, sequence)
json = ::JSON.dump(hash)
json_length = json.bytesize
pack = "AANNA#{json_length}"
frame = ["2", "J", sequence, json_length, json]
frame.pack(pack)
end
end # JsonEncoder
end
c = Lumberjack::Socket.new("localhost", 5044)
batch = 100.times.map { |i| {"message" => i} }
puts c.write_sync(batch)
# when sending data to a beats input with netty >= 3.1.35
# this write_sync will take about a minute to return due to being stuck waiting for ack
# for older versions of netty this script returns almost immediately
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment