Created
February 7, 2017 20:34
-
-
Save denysvitali/8b9f17b745994786a53b9ea23144c7cd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "./ssdp" | |
module SSDP | |
class Consumer | |
@options : SSDP::Options | |
def initialize(options={} of Symbol=>(String|Bool|Int32)) | |
@options = SSDP::DEFAULTS.from(options.to_h) | |
@search_socket = SSDP.create_broadcaster | |
@watch = { | |
"socket" => nil, | |
"thread" => nil, | |
"services" => {} of String => String, | |
} | |
end | |
def search(options : NamedTuple) | |
options = @options.from(@options.to_h.merge options.to_h) | |
fail "SSDP consumer async search missing callback." if (options["synchronous"] == false) && options["callback"].nil? | |
fail "SSDP consumer search accepting multiple responses must specify a timeout value." if (options["first_only"] == false) && (options["timeout"].to_i < 1) | |
warn "Warning: Calling SSDP search without a service specified." if options["service"].nil? && (options["no_warnings"] != true) | |
warn "Warning: Calling SSDP search without a timeout value." if (options["timeout"].to_i < 1) && (options["no_warnings"] != true) | |
@search_socket.send compose_search(options), 0, options["broadcast"], options["port"] | |
if options["synchronous"] | |
search_sync options | |
else | |
search_async options | |
end | |
end | |
def start_watching_type(type, &block) | |
@watch["services"][type] = block | |
start_watch if @watch["thread"].nil? | |
end | |
def stop_watching_type(type) | |
@watch["services"].delete type | |
stop_watch if (@watch["services"].count == 0) && @watch["thread"] | |
end | |
def stop_watching_all | |
@watch["services"] = {} of KeyType => ValueType | |
stop_watch if @watch["thread"] | |
end | |
def compose_search(options) | |
if (options["timeout"].to_i > 1) | |
max_delay = options["timeout"].to_i - 1 | |
else | |
max_delay = 1 | |
end | |
query = "M-SEARCH * HTTP/1.1\n" \ | |
"HOST: #{options["broadcast"]}:#{options["port"]}\n" \ | |
"MAN: \"ssdp:discover\"\n" \ | |
"MX: #{max_delay}\n" | |
query += "ST: #{options["service"]}\n" if options["service"] | |
options["params"].each { |key, val| query += "#{key}: #{val}\n" } if options["params"] | |
query + "\n" | |
end | |
def search_sync(options) | |
if options["first_only"] | |
search_single options | |
else | |
search_multi options | |
end | |
end | |
def search_async(options) | |
if options["first_only"] | |
Thread.new { search_single options } | |
else | |
Thread.new { search_multi options } | |
end | |
end | |
def search_single(options) | |
result = nil | |
found = false | |
if options["timeout"] | |
began = Time.now | |
remaining = options["timeout"] | |
while !found && remaining > 0 | |
ready = IO.select [@search_socket], nil, nil, remaining | |
if ready | |
message, producer = @search_socket.recvfrom options["maxpack"] | |
result = process_ssdp_packet message, producer | |
found = options["filter"].nil? ? true : options["filter"].call(result) | |
end | |
remaining = options["timeout"] - (Time.now - began).to_i | |
end | |
else | |
until found | |
message, producer = @search_socket.recvfrom options["maxpack"] | |
result = process_ssdp_packet message, producer | |
found = options["filter"].nil? ? true : options["filter"].call(result) | |
end | |
end | |
if options["synchronous"] | |
result | |
else | |
options["callback"].call result | |
end | |
end | |
def search_multi(options) | |
remaining = options["timeout"] | |
responses = [] of ElementType | |
while remaining > 0 | |
start_time = Time.now | |
ready = IO.select [@search_socket], nil, nil, remaining | |
if ready | |
message, producer = @search_socket.recvfrom options["maxpack"] | |
if options["filter"].nil? | |
responses << process_ssdp_packet(message, producer) | |
else | |
result = process_ssdp_packet message, producer | |
responses << result if options["filter"].call(result) | |
end | |
end | |
remaining -= (Time.now - start_time).to_i | |
end | |
if options["synchronous"] | |
responses | |
else | |
options["callback"].call responses | |
end | |
end | |
def process_ssdp_packet(message, producer) | |
ssdp = SSDP.parse_ssdp message | |
{:address => producer[3], :port => producer[1]}.merge ssdp | |
end | |
def start_watch | |
@watch["socket"] = SSDP.create_listener @options | |
@watch["thread"] = Thread.new do | |
begin | |
loop do | |
message, producer = @watch["socket"].recvfrom @options["maxpack"] | |
notification = process_ssdp_packet message, producer | |
notification_type = notification["params"]["NT"] | |
@watch["services"][notification_type].call notification if @watch["services"].include? notification_type | |
end | |
ensure | |
@watch["socket"].close | |
end | |
end | |
end | |
def stop_watch | |
@watch["thread"].exit | |
@watch["thread"] = nil | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment