Skip to content

Instantly share code, notes, and snippets.

@denysvitali
Created February 7, 2017 20:34
Show Gist options
  • Save denysvitali/8b9f17b745994786a53b9ea23144c7cd to your computer and use it in GitHub Desktop.
Save denysvitali/8b9f17b745994786a53b9ea23144c7cd to your computer and use it in GitHub Desktop.
require "./ssdp"
module SSDP
class Consumer
@options : SSDP::Options
def initialize(options={} of Symbol=>(String|Bool|Int32))
@options = SSDP::DEFAULTS.from(options.to_h)
@search_socket = SSDP.create_broadcaster
@watch = {
"socket" => nil,
"thread" => nil,
"services" => {} of String => String,
}
end
def search(options : NamedTuple)
options = @options.from(@options.to_h.merge options.to_h)
fail "SSDP consumer async search missing callback." if (options["synchronous"] == false) && options["callback"].nil?
fail "SSDP consumer search accepting multiple responses must specify a timeout value." if (options["first_only"] == false) && (options["timeout"].to_i < 1)
warn "Warning: Calling SSDP search without a service specified." if options["service"].nil? && (options["no_warnings"] != true)
warn "Warning: Calling SSDP search without a timeout value." if (options["timeout"].to_i < 1) && (options["no_warnings"] != true)
@search_socket.send compose_search(options), 0, options["broadcast"], options["port"]
if options["synchronous"]
search_sync options
else
search_async options
end
end
def start_watching_type(type, &block)
@watch["services"][type] = block
start_watch if @watch["thread"].nil?
end
def stop_watching_type(type)
@watch["services"].delete type
stop_watch if (@watch["services"].count == 0) && @watch["thread"]
end
def stop_watching_all
@watch["services"] = {} of KeyType => ValueType
stop_watch if @watch["thread"]
end
def compose_search(options)
if (options["timeout"].to_i > 1)
max_delay = options["timeout"].to_i - 1
else
max_delay = 1
end
query = "M-SEARCH * HTTP/1.1\n" \
"HOST: #{options["broadcast"]}:#{options["port"]}\n" \
"MAN: \"ssdp:discover\"\n" \
"MX: #{max_delay}\n"
query += "ST: #{options["service"]}\n" if options["service"]
options["params"].each { |key, val| query += "#{key}: #{val}\n" } if options["params"]
query + "\n"
end
def search_sync(options)
if options["first_only"]
search_single options
else
search_multi options
end
end
def search_async(options)
if options["first_only"]
Thread.new { search_single options }
else
Thread.new { search_multi options }
end
end
def search_single(options)
result = nil
found = false
if options["timeout"]
began = Time.now
remaining = options["timeout"]
while !found && remaining > 0
ready = IO.select [@search_socket], nil, nil, remaining
if ready
message, producer = @search_socket.recvfrom options["maxpack"]
result = process_ssdp_packet message, producer
found = options["filter"].nil? ? true : options["filter"].call(result)
end
remaining = options["timeout"] - (Time.now - began).to_i
end
else
until found
message, producer = @search_socket.recvfrom options["maxpack"]
result = process_ssdp_packet message, producer
found = options["filter"].nil? ? true : options["filter"].call(result)
end
end
if options["synchronous"]
result
else
options["callback"].call result
end
end
def search_multi(options)
remaining = options["timeout"]
responses = [] of ElementType
while remaining > 0
start_time = Time.now
ready = IO.select [@search_socket], nil, nil, remaining
if ready
message, producer = @search_socket.recvfrom options["maxpack"]
if options["filter"].nil?
responses << process_ssdp_packet(message, producer)
else
result = process_ssdp_packet message, producer
responses << result if options["filter"].call(result)
end
end
remaining -= (Time.now - start_time).to_i
end
if options["synchronous"]
responses
else
options["callback"].call responses
end
end
def process_ssdp_packet(message, producer)
ssdp = SSDP.parse_ssdp message
{:address => producer[3], :port => producer[1]}.merge ssdp
end
def start_watch
@watch["socket"] = SSDP.create_listener @options
@watch["thread"] = Thread.new do
begin
loop do
message, producer = @watch["socket"].recvfrom @options["maxpack"]
notification = process_ssdp_packet message, producer
notification_type = notification["params"]["NT"]
@watch["services"][notification_type].call notification if @watch["services"].include? notification_type
end
ensure
@watch["socket"].close
end
end
end
def stop_watch
@watch["thread"].exit
@watch["thread"] = nil
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment