Created
March 17, 2009 02:47
-
-
Save raggi/80247 to your computer and use it in GitHub Desktop.
eventmachine postgres to sequel async shim
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# I stole this code from em-mysql (tmm1 <3) | |
module Sequel | |
class Database | |
attr_accessor :_async | |
end | |
class Dataset | |
def async_insert *args, &cb | |
db._async.insert insert_sql(*args), &cb | |
nil | |
end | |
def async_update *args, &cb | |
db._async.update update_sql(*args), &cb | |
nil | |
end | |
def async_delete &cb | |
db._async.execute delete_sql, &cb | |
nil | |
end | |
def async_multi_insert *args, &cb | |
db._async.execute multi_insert_sql(*args).first, &cb | |
nil | |
end | |
def async_multi_insert_ignore *args, &cb | |
db._async.execute multi_insert_sql(*args).first.sub(/insert/i, "INSERT IGNORE"), &cb | |
nil | |
end | |
def async_each *args | |
db._async.select(select_sql(*args)) do |rows| | |
rows.each{|r| | |
r = transform_load(r) if @transform | |
r = row_proc[r] if row_proc | |
yield r | |
} | |
end | |
nil | |
end | |
def async_all | |
db._async.select(sql) do |rows| | |
if row_proc or transform | |
yield(rows.map{|r| | |
r = transform_load(r) if @transform | |
r = row_proc[r] if row_proc | |
r | |
}) | |
else | |
yield(rows) | |
end | |
end | |
nil | |
end | |
def async_count &cb | |
if options_overlap(COUNT_FROM_SELF_OPTS) | |
from_self.async_count(&cb) | |
else | |
naked.async_each(STOCK_COUNT_OPTS){|r| | |
yield r.values.first.to_i | |
} | |
end | |
nil | |
end | |
end | |
class Model | |
def async_update *args, &cb | |
this.async_update(*args, &cb) | |
set(*args) | |
self | |
end | |
def async_delete &cb | |
this.async_delete(&cb) | |
nil | |
end | |
class << self | |
[ :async_insert, | |
:async_multi_insert, | |
:async_multi_insert_ignore, | |
:async_each, | |
:async_all, | |
:async_update, | |
:async_count ].each do |method| | |
class_eval %[ | |
def #{method} *args, &cb | |
dataset.#{method}(*args, &cb) | |
end | |
] | |
end | |
# async version of Model#[] | |
def async_lookup args | |
unless Hash === args | |
args = primary_key_hash(args) | |
end | |
dataset.where(args).limit(1).async_all{ |rows| | |
if rows.any? | |
yield rows.first | |
else | |
yield nil | |
end | |
} | |
nil | |
end | |
end | |
end | |
end | |
# Ze shim! | |
module EventMachine::Protocols::Postgres3::AsyncSequelShim | |
def execute(sql, &cb) | |
query(sql).callback do |status, result, errors| | |
cb[result.cmd_tag.split(' ').last.to_i] | |
end | |
end | |
def insert(sql, &cb) | |
# TODO see #insert_result, it uses conn.last_insert_id | |
query(sql).callback do |status, result, errors| | |
cb[nil] | |
end | |
end | |
def update(sql, &cb) | |
query(sql).callback do |status, result, errors| | |
cb[nil] | |
end | |
end | |
def select(sql, &cb) | |
query(sql).callback do |status, result, errors| | |
fields = result.fields.map { |f| f.name } | |
result.rows.map! { |r| Hash[*fields.zip(r).flatten] } | |
cb[result.rows] | |
end | |
end | |
end | |
BEGIN{%w(rubygems sequel eventmachine).each {|r| require r} if __FILE__ == $0} | |
if __FILE__ == $0 | |
require 'eventmachine' | |
DB = Sequel.connect "postgres:///raggi" | |
# DB.create_table(:a) | |
# DB.alter_table(:a) do | |
# primary_key :id | |
# add_column :one, :text | |
# add_column :two, :integer | |
# end | |
EM.run do | |
EM.connect("/tmp/.s.PGSQL.5432", EventMachine::Protocols::Postgres3) do |db| | |
db.connect('raggi', 'raggi').callback do |status, error| | |
abort(error) unless status | |
db.extend EventMachine::Protocols::Postgres3::AsyncSequelShim | |
DB._async = db | |
DB[:a].async_insert(:one => 'blah', :two => 20) do |res| | |
p res | |
EM.stop | |
end | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment