Created
March 5, 2012 18:27
-
-
Save jeremyvdw/1980156 to your computer and use it in GitHub Desktop.
apublish & asubscribe PubNub methods (wrapped in Fibers)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'eventmachine' | |
require 'em-http-request' | |
require 'yajl' | |
module PubSub | |
class Pubnub | |
#** | |
#* Pubnub | |
#* | |
#* Init the Pubnub Client API | |
#* | |
#* @param string publish_key required key to send messages. | |
#* @param string subscribe_key required key to receive messages. | |
#* @param string secret_key required key to sign messages. | |
#* @param boolean ssl required for 2048 bit encrypted messages. | |
#* | |
def initialize(publish_key, subscribe_key, secret_key, ssl_on) | |
@publish_key = publish_key | |
@subscribe_key = subscribe_key | |
@secret_key = secret_key | |
@ssl = ssl_on | |
@origin = 'pubsub.pubnub.com' | |
@limit = 1_800 | |
@timetoken = 0 | |
if @ssl | |
@origin = 'https://' + @origin | |
else | |
@origin = 'http://' + @origin | |
end | |
end | |
#** | |
#* Apublish | |
#* | |
#* This is NON-BLOCKING. | |
#* | |
def apublish(channel, message, callback = nil, errback = nil) | |
## Capture User Input | |
message = message.to_json | |
if ENV['VERBOSE_PUBNUB'] | |
puts "\n ==> PUB to #{channel.inspect}: #{message}\n" | |
end | |
## Sign Message | |
signature = @secret_key.length > 0 ? Digest::MD5.hexdigest([ | |
@publish_key, | |
@subscribe_key, | |
@secret_key, | |
channel, | |
message | |
].join('/')) : '0' | |
## Fail if message (in bytes) too long. | |
## Hard limit set to 1_800 bytes (http://www.pubnub.com/tutorial/developer-intro-tutorial) | |
if message.size_in_bytes > @limit | |
puts('Message TOO LONG (' + @limit.to_s + ' LIMIT)') | |
return [ 0, 'Message Too Long.' ] | |
end | |
## Send Message | |
request = [ | |
'publish', | |
@publish_key, | |
@subscribe_key, | |
signature, | |
channel, | |
'0', | |
message | |
] | |
## Construct Request URL | |
url = @origin + '/' + request.map{ |bit| bit.split('').map{ |ch| | |
' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.index(ch) ? | |
'%' + ch.unpack('H2')[0].to_s.upcase : URI.encode(ch) | |
}.join('') }.join('/') | |
Fiber.new { | |
http = EventMachine::HttpRequest.new(url).get(:keepalive => true) | |
start = Time.now.to_f | |
http.errback { | |
message = "Failed to publish message: #{http.error}" | |
errback.call message | |
} | |
http.callback { | |
if ENV['VERBOSE_PUBNUB'] | |
duration = '%.3f' % (Time.now.to_f - start) | |
puts " <== PUBNUB responded in #{duration} seconds: #{http.response.inspect} - for message #{request.last}\n" | |
end | |
callback.call Yajl::Parser.parse(http.response) | |
} | |
}.resume | |
end | |
#** | |
#* Asubscribe | |
#* | |
#* This is NON-BLOCKING. | |
#* | |
def asubscribe(channel, callback = nil, errback = nil, timetoken = 0) | |
request = [ | |
'subscribe', | |
@subscribe_key, | |
channel, | |
'0', | |
timetoken.to_s | |
] | |
## Construct Request URL | |
url = @origin + '/' + request.map{ |bit| bit.split('').map{ |ch| | |
' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.index(ch) ? | |
'%' + ch.unpack('H2')[0].to_s.upcase : URI.encode(ch) | |
}.join('') }.join('/') | |
Fiber.new { | |
http = EM::HttpRequest.new(url).get(:keepalive => true) | |
http.errback { | |
# Check if there's a *real* error to call the errback | |
EM::Timer.new(1) do | |
asubscribe(channel, callback, errback, timetoken) | |
end | |
} | |
http.callback { | |
response = Yajl::Parser.parse(http.response) | |
messages = response[0] | |
timetoken = response[1] | |
## If it was a timeout | |
next if !messages.length | |
## Run user Callback and Reconnect if user permits. | |
messages.each do |message| | |
callback.call(message) | |
end | |
asubscribe(channel, callback, errback, timetoken) | |
} | |
}.resume | |
end | |
##################### WIP ################## | |
#** | |
#* History | |
#* | |
#* Load history from a channel. | |
#* | |
#* @param array args with 'channel' and 'limit'. | |
#* @return mixed false on fail, array on success. | |
#* | |
def history(args) | |
## Capture User Input | |
limit = +args['limit'] ? +args['limit'] : 10 | |
channel = args['channel'] | |
## Fail if bad input. | |
if (!channel) | |
puts 'Missing Channel.' | |
return false | |
end | |
## Get History | |
return self._request([ | |
'history', | |
@subscribe_key, | |
channel, | |
'0', | |
limit.to_s | |
]) | |
end | |
#** | |
#* Time | |
#* | |
#* Timestamp from PubNub Cloud. | |
#* | |
#* @return int timestamp. | |
#* | |
def time() | |
return self._request([ | |
'time', | |
'0' | |
])[0] | |
end | |
#** | |
#* Request URL | |
#* | |
#* @param array request of url directories. | |
#* @return array from JSON response. | |
#* | |
def _request(request) | |
## Construct Request URL | |
url = @origin + '/' + request.map{ |bit| bit.split('').map{ |ch| | |
' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.index(ch) ? | |
'%' + ch.unpack('H2')[0].to_s.upcase : URI.encode(ch) | |
}.join('') }.join('/') | |
response = '' | |
begin | |
start = Time.now.to_f | |
open(url) do |f| | |
response = f.read | |
end | |
if ENV['VERBOSE_PUBNUB'] | |
duration = '%.3f' % (Time.now.to_f - start) | |
puts " <== PUBNUB responded in #{duration} seconds: #{response.inspect} - for message #{request.last}\n" | |
end | |
return JSON.parse(response) | |
rescue OpenURI::HTTPError => err | |
puts "\n\nPubNub HTTP Error on request. Message:" | |
puts request.last | |
puts "\n Response:" | |
puts err.io.try(:read) | |
puts "\n\n" | |
raise err | |
end | |
end | |
end | |
end | |
class String | |
def size_in_bytes | |
self.unpack("C*").size | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'apubnub' | |
EM.run { | |
channel = 'test' | |
message = 'Hello World' | |
pubnub = PubSub::Pubnub.new(Settings.pubnub.publish_key, Settings.pubnub.subscribe_key, Settings.pubnub.secret_key, true) | |
cb = Proc.new { |reply| | |
puts "Message: #{reply}" | |
} | |
errb = Proc.new { |error| | |
puts "Error: #{error}" | |
} | |
pubnub.asubscribe(channel, cb, errb) | |
pubnub.apublish(channel, message, cb, errb) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment