-
-
Save mgttt/f61410f009a0a110ebacfbc02848b837 to your computer and use it in GitHub Desktop.
The illusive read/write splitting lua for the mysql-proxy. Includes connection pooling and read load balancing.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--[[ | |
Copyright 2008, 2010, Oracle and/or its affiliates. All rights reserved. | |
This program is free software; you can redistribute it and/or modify | |
it under the terms of the GNU General Public License as published by | |
the Free Software Foundation; version 2 of the License. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License | |
along with this program; if not, write to the Free Software | |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
--]] | |
-- admin.lua | |
--[[ | |
See http://www.chriscalender.com/?p=41 | |
(Thanks to Chris Calender) | |
See http://datacharmer.blogspot.com/2009/01/mysql-proxy-is-back.html | |
(Thanks Giuseppe Maxia) | |
--]] | |
function set_error(errmsg) | |
proxy.response = { | |
type = proxy.MYSQLD_PACKET_ERR, | |
errmsg = errmsg or "error" | |
} | |
end | |
function read_query(packet) | |
if packet:byte() ~= proxy.COM_QUERY then | |
set_error("[admin] we only handle text-based queries (COM_QUERY)") | |
return proxy.PROXY_SEND_RESULT | |
end | |
local query = packet:sub(2) | |
local rows = { } | |
local fields = { } | |
-- try to match the string up to the first non-alphanum | |
local f_s, f_e, command = string.find(packet, "^%s*(%w+)", 2) | |
local option | |
if f_e then | |
-- if that match, take the next sub-string as option | |
f_s, f_e, option = string.find(packet, "^%s+(%w+)", f_e + 1) | |
end | |
-- we got our commands, execute it | |
if command == "show" and option == "querycounter" then | |
--- | |
-- proxy.PROXY_SEND_RESULT requires | |
-- | |
-- proxy.response.type to be either | |
-- * proxy.MYSQLD_PACKET_OK or | |
-- * proxy.MYSQLD_PACKET_ERR | |
-- | |
-- for proxy.MYSQLD_PACKET_OK you need a resultset | |
-- * fields | |
-- * rows | |
-- | |
-- for proxy.MYSQLD_PACKET_ERR | |
-- * errmsg | |
proxy.response.type = proxy.MYSQLD_PACKET_OK | |
proxy.response.resultset = { | |
fields = { | |
{ type = proxy.MYSQL_TYPE_LONG, name = "query_counter", }, | |
}, | |
rows = { | |
{ proxy.global.query_counter } | |
} | |
} | |
-- we have our result, send it back | |
return proxy.PROXY_SEND_RESULT | |
elseif command == "show" and option == "myerror" then | |
proxy.response.type = proxy.MYSQLD_PACKET_ERR | |
proxy.response.errmsg = "my first error" | |
return proxy.PROXY_SEND_RESULT | |
elseif string.sub(packet, 2):lower() == 'select help' then | |
return show_process_help() | |
elseif string.sub(packet, 2):lower() == 'show proxy processlist' then | |
return show_process_table() | |
elseif query == "SELECT * FROM backends" then | |
fields = { | |
{ name = "backend_ndx", | |
type = proxy.MYSQL_TYPE_LONG }, | |
{ name = "address", | |
type = proxy.MYSQL_TYPE_STRING }, | |
{ name = "state", | |
type = proxy.MYSQL_TYPE_STRING }, | |
{ name = "type", | |
type = proxy.MYSQL_TYPE_STRING }, | |
} | |
for i = 1, #proxy.global.backends do | |
local b = proxy.global.backends[i] | |
rows[#rows + 1] = { | |
i, b.dst.name, b.state, b.type | |
} | |
end | |
else | |
set_error() | |
return proxy.PROXY_SEND_RESULT | |
end | |
proxy.response = { | |
type = proxy.MYSQLD_PACKET_OK, | |
resultset = { | |
fields = fields, | |
rows = rows | |
} | |
} | |
return proxy.PROXY_SEND_RESULT | |
end | |
function make_dataset (header, dataset) | |
proxy.response.type = proxy.MYSQLD_PACKET_OK | |
proxy.response.resultset = { | |
fields = {}, | |
rows = {} | |
} | |
for i,v in pairs (header) do | |
table.insert(proxy.response.resultset.fields, {type = proxy.MYSQL_TYPE_STRING, name = v}) | |
end | |
for i,v in pairs (dataset) do | |
table.insert(proxy.response.resultset.rows, v ) | |
end | |
return proxy.PROXY_SEND_RESULT | |
end | |
function show_process_table() | |
local dataset = {} | |
local header = { 'Id', 'IP Address', 'Time' } | |
local rows = {} | |
for t_i, t_v in pairs (proxy.global.process) do | |
for s_i, s_v in pairs ( t_v ) do | |
table.insert(rows, { t_i, s_v.ip, os.date('%c',s_v.ts) }) | |
end | |
end | |
return make_dataset(header,rows) | |
end | |
function show_process_help() | |
local dataset = {} | |
local header = { 'command', 'description' } | |
local rows = { | |
{'SELECT HELP', 'This command.'}, | |
{'SHOW PROXY PROCESSLIST', 'Show all connections and their true IP Address.'}, | |
} | |
return make_dataset(header,rows) | |
end | |
function dump_process_table() | |
proxy.global.initialize_process_table() | |
print('current contents of process table') | |
for t_i, t_v in pairs (proxy.global.process) do | |
print ('session id: ', t_i) | |
for s_i, s_v in pairs ( t_v ) do | |
print ( '\t', s_i, s_v.ip, s_v.ts ) | |
end | |
end | |
print ('---END PROCESS TABLE---') | |
end | |
--[[ Help | |
we use a simple string-match to split commands are word-boundaries | |
mysql> show querycounter | |
is split into | |
command = "show" | |
option = "querycounter" | |
spaces are ignored, the case has to be as is. | |
mysql> show myerror | |
returns a error-packet | |
--]] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[mysql-proxy] | |
user_name=mysql-proxy | |
admin-address = 127.0.0.1:3308 | |
proxy-address = 127.0.0.1:3306 | |
proxy-skip-profiling = true | |
daemon = 1 | |
pid-file = /var/run/mysql-proxy.pid | |
log-file = /var/log/mysql-proxy.log | |
log-level = debug | |
proxy-backend-addresses = master:3306 | |
proxy-read-only-backend-addresses = slave01:3306,slave02:3306 | |
keepalive=1 | |
admin-username=root | |
admin-password=secretpassword | |
admin-lua-script=/usr/lib64/mysql-proxy/lua/admin.lua | |
#proxy-lua-script=/usr/lib64/mysql-proxy/lua/reporter.lua | |
proxy-lua-script=/usr/lib64/mysql-proxy/lua/rw-splitting.lua | |
plugin_dir=/usr/lib64/mysql-proxy/plugins | |
plugins=proxy,admin | |
is_debug = false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--[[ | |
Copyright 2008, 2010, Oracle and/or its affiliates. All rights reserved. | |
This program is free software; you can redistribute it and/or modify | |
it under the terms of the GNU General Public License as published by | |
the Free Software Foundation; version 2 of the License. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License | |
along with this program; if not, write to the Free Software | |
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
--]] | |
-- reporter.lua | |
--[[ | |
See http://www.chriscalender.com/?p=41 | |
(Thanks to Chris Calender) | |
See http://datacharmer.blogspot.com/2009/01/mysql-proxy-is-back.html | |
(Thanks Giuseppe Maxia) | |
--]] | |
proxy.global.query_counter = proxy.global.query_counter or 0 | |
function proxy.global.initialize_process_table() | |
if proxy.global.process == nil then | |
proxy.global.process = {} | |
end | |
if proxy.global.process[proxy.connection.server.thread_id] == nil then | |
proxy.global.process[proxy.connection.server.thread_id] = {} | |
end | |
end | |
function read_auth_result( auth ) | |
local state = auth.packet:byte() | |
if state == proxy.MYSQLD_PACKET_OK then | |
proxy.global.initialize_process_table() | |
table.insert( proxy.global.process[proxy.connection.server.thread_id], | |
{ ip = proxy.connection.client.src.name, ts = os.time() } ) | |
end | |
end | |
function disconnect_client() | |
local connection_id = proxy.connection.server.thread_id | |
if connection_id then | |
-- client has disconnected, set this to nil | |
proxy.global.process[connection_id] = nil | |
end | |
end | |
--- | |
-- read_query() can return a resultset | |
-- | |
-- You can use read_query() to return a result-set. | |
-- | |
-- @param packet the mysql-packet sent by the client | |
-- | |
-- @return | |
-- * nothing to pass on the packet as is, | |
-- * proxy.PROXY_SEND_QUERY to send the queries from the proxy.queries queue | |
-- * proxy.PROXY_SEND_RESULT to send your own result-set | |
-- | |
function read_query( packet ) | |
-- a new query came in in this connection | |
-- using proxy.global.* to make it available to the admin plugin | |
proxy.global.query_counter = proxy.global.query_counter + 1 | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--[[ $%BEGINLICENSE%$ | |
Copyright (c) 2007, 2009, Oracle and/or its affiliates. All rights reserved. | |
This program is free software; you can redistribute it and/or | |
modify it under the terms of the GNU General Public License as | |
published by the Free Software Foundation; version 2 of the | |
License. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License | |
along with this program; if not, write to the Free Software | |
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA | |
02110-1301 USA | |
$%ENDLICENSE%$ --]] | |
--- | |
-- a flexible statement based load balancer with connection pooling | |
-- | |
-- * build a connection pool of min_idle_connections for each backend and maintain | |
-- its size | |
-- * | |
-- | |
-- | |
local commands = require("proxy.commands") | |
local tokenizer = require("proxy.tokenizer") | |
local lb = require("proxy.balance") | |
local auto_config = require("proxy.auto-config") | |
--- config | |
-- | |
-- connection pool | |
if not proxy.global.config.rwsplit then | |
proxy.global.config.rwsplit = { | |
min_idle_connections = 4, | |
max_idle_connections = 8, | |
is_debug = false | |
} | |
end | |
--- | |
-- read/write splitting sends all non-transactional SELECTs to the slaves | |
-- | |
-- is_in_transaction tracks the state of the transactions | |
local is_in_transaction = false | |
-- if this was a SELECT SQL_CALC_FOUND_ROWS ... stay on the same connections | |
local is_in_select_calc_found_rows = false | |
--- | |
-- get a connection to a backend | |
-- | |
-- as long as we don't have enough connections in the pool, create new connections | |
-- | |
function connect_server() | |
local is_debug = proxy.global.config.rwsplit.is_debug | |
-- make sure that we connect to each backend at least ones to | |
-- keep the connections to the servers alive | |
-- | |
-- on read_query we can switch the backends again to another backend | |
if is_debug then | |
print() | |
print("[connect_server] " .. proxy.connection.client.src.name) | |
end | |
local rw_ndx = 0 | |
-- init all backends | |
for i = 1, #proxy.global.backends do | |
local s = proxy.global.backends[i] | |
local pool = s.pool -- we don't have a username yet, try to find a connections which is idling | |
local cur_idle = pool.users[""].cur_idle_connections | |
pool.min_idle_connections = proxy.global.config.rwsplit.min_idle_connections | |
pool.max_idle_connections = proxy.global.config.rwsplit.max_idle_connections | |
if is_debug then | |
print(" [".. i .."].connected_clients = " .. s.connected_clients) | |
print(" [".. i .."].pool.cur_idle = " .. cur_idle) | |
print(" [".. i .."].pool.max_idle = " .. pool.max_idle_connections) | |
print(" [".. i .."].pool.min_idle = " .. pool.min_idle_connections) | |
print(" [".. i .."].type = " .. s.type) | |
print(" [".. i .."].state = " .. s.state) | |
end | |
-- prefer connections to the master | |
if s.type == proxy.BACKEND_TYPE_RW and | |
s.state ~= proxy.BACKEND_STATE_DOWN and | |
cur_idle < pool.min_idle_connections then | |
proxy.connection.backend_ndx = i | |
break | |
elseif s.type == proxy.BACKEND_TYPE_RO and | |
s.state ~= proxy.BACKEND_STATE_DOWN and | |
cur_idle < pool.min_idle_connections then | |
proxy.connection.backend_ndx = i | |
break | |
elseif s.type == proxy.BACKEND_TYPE_RW and | |
s.state ~= proxy.BACKEND_STATE_DOWN and | |
rw_ndx == 0 then | |
rw_ndx = i | |
end | |
end | |
if proxy.connection.backend_ndx == 0 then | |
if is_debug then | |
print(" [" .. rw_ndx .. "] taking master as default") | |
end | |
proxy.connection.backend_ndx = rw_ndx | |
end | |
-- pick a random backend | |
-- | |
-- we someone have to skip DOWN backends | |
-- ok, did we got a backend ? | |
if proxy.connection.server then | |
if is_debug then | |
print(" using pooled connection from: " .. proxy.connection.backend_ndx) | |
end | |
-- stay with it | |
return proxy.PROXY_IGNORE_RESULT | |
end | |
if is_debug then | |
print(" [" .. proxy.connection.backend_ndx .. "] idle-conns below min-idle") | |
end | |
-- open a new connection | |
end | |
--- | |
-- put the successfully authed connection into the connection pool | |
-- | |
-- @param auth the context information for the auth | |
-- | |
-- auth.packet is the packet | |
function read_auth_result( auth ) | |
if is_debug then | |
print("[read_auth_result] " .. proxy.connection.client.src.name) | |
end | |
if auth.packet:byte() == proxy.MYSQLD_PACKET_OK then | |
-- auth was fine, disconnect from the server | |
proxy.connection.backend_ndx = 0 | |
elseif auth.packet:byte() == proxy.MYSQLD_PACKET_EOF then | |
-- we received either a | |
-- | |
-- * MYSQLD_PACKET_ERR and the auth failed or | |
-- * MYSQLD_PACKET_EOF which means a OLD PASSWORD (4.0) was sent | |
print("(read_auth_result) ... not ok yet"); | |
elseif auth.packet:byte() == proxy.MYSQLD_PACKET_ERR then | |
-- auth failed | |
end | |
end | |
--- | |
-- read/write splitting | |
function read_query( packet ) | |
local is_debug = proxy.global.config.rwsplit.is_debug | |
local cmd = commands.parse(packet) | |
local c = proxy.connection.client | |
local r = auto_config.handle(cmd) | |
if r then return r end | |
local tokens | |
local norm_query | |
-- looks like we have to forward this statement to a backend | |
if is_debug then | |
print("[read_query] " .. proxy.connection.client.src.name) | |
print(" current backend = " .. proxy.connection.backend_ndx) | |
print(" client default db = " .. c.default_db) | |
print(" client username = " .. c.username) | |
if cmd.type == proxy.COM_QUERY then | |
print(" query = " .. cmd.query) | |
end | |
end | |
if cmd.type == proxy.COM_QUIT then | |
-- don't send COM_QUIT to the backend. We manage the connection | |
-- in all aspects. | |
proxy.response = { | |
type = proxy.MYSQLD_PACKET_OK, | |
} | |
if is_debug then | |
print(" (QUIT) current backend = " .. proxy.connection.backend_ndx) | |
end | |
return proxy.PROXY_SEND_RESULT | |
end | |
-- COM_BINLOG_DUMP packet can't be balanced | |
-- | |
-- so we must send it always to the master | |
if cmd.type == proxy.COM_BINLOG_DUMP then | |
-- if we don't have a backend selected, let's pick the master | |
-- | |
if proxy.connection.backend_ndx == 0 then | |
proxy.connection.backend_ndx = lb.idle_failsafe_rw() | |
end | |
return | |
end | |
proxy.queries:append(1, packet, { resultset_is_needed = true }) | |
-- read/write splitting | |
-- | |
-- send all non-transactional SELECTs to a slave | |
if not is_in_transaction and | |
cmd.type == proxy.COM_QUERY then | |
tokens = tokens or assert(tokenizer.tokenize(cmd.query)) | |
local stmt = tokenizer.first_stmt_token(tokens) | |
if stmt.token_name == "TK_SQL_SELECT" then | |
is_in_select_calc_found_rows = false | |
local is_insert_id = false | |
for i = 1, #tokens do | |
local token = tokens[i] | |
-- SQL_CALC_FOUND_ROWS + FOUND_ROWS() have to be executed | |
-- on the same connection | |
-- print("token: " .. token.token_name) | |
-- print(" val: " .. token.text) | |
if not is_in_select_calc_found_rows and token.token_name == "TK_SQL_SQL_CALC_FOUND_ROWS" then | |
is_in_select_calc_found_rows = true | |
elseif not is_insert_id and token.token_name == "TK_LITERAL" then | |
local utext = token.text:upper() | |
if utext == "LAST_INSERT_ID" or | |
utext == "@@INSERT_ID" then | |
is_insert_id = true | |
end | |
end | |
-- we found the two special token, we can't find more | |
if is_insert_id and is_in_select_calc_found_rows then | |
break | |
end | |
end | |
-- if we ask for the last-insert-id we have to ask it on the original | |
-- connection | |
if not is_insert_id then | |
local backend_ndx = lb.idle_ro() | |
if backend_ndx > 0 then | |
proxy.connection.backend_ndx = backend_ndx | |
end | |
else | |
print(" found a SELECT LAST_INSERT_ID(), staying on the same backend") | |
end | |
end | |
end | |
-- no backend selected yet, pick a master | |
if proxy.connection.backend_ndx == 0 then | |
-- we don't have a backend right now | |
-- | |
-- let's pick a master as a good default | |
-- | |
proxy.connection.backend_ndx = lb.idle_failsafe_rw() | |
end | |
-- by now we should have a backend | |
-- | |
-- in case the master is down, we have to close the client connections | |
-- otherwise we can go on | |
if proxy.connection.backend_ndx == 0 then | |
return proxy.PROXY_SEND_QUERY | |
end | |
local s = proxy.connection.server | |
-- if client and server db don't match, adjust the server-side | |
-- | |
-- skip it if we send a INIT_DB anyway | |
if cmd.type ~= proxy.COM_INIT_DB and | |
c.default_db and c.default_db ~= s.default_db then | |
print(" server default db: " .. s.default_db) | |
print(" client default db: " .. c.default_db) | |
print(" syncronizing") | |
proxy.queries:prepend(2, string.char(proxy.COM_INIT_DB) .. c.default_db, { resultset_is_needed = true }) | |
end | |
-- send to master | |
if is_debug then | |
if proxy.connection.backend_ndx > 0 then | |
local b = proxy.global.backends[proxy.connection.backend_ndx] | |
print(" sending to backend : " .. b.dst.name); | |
print(" is_slave : " .. tostring(b.type == proxy.BACKEND_TYPE_RO)); | |
print(" server default db: " .. s.default_db) | |
print(" server username : " .. s.username) | |
end | |
print(" in_trans : " .. tostring(is_in_transaction)) | |
print(" in_calc_found : " .. tostring(is_in_select_calc_found_rows)) | |
print(" COM_QUERY : " .. tostring(cmd.type == proxy.COM_QUERY)) | |
end | |
return proxy.PROXY_SEND_QUERY | |
end | |
--- | |
-- as long as we are in a transaction keep the connection | |
-- otherwise release it so another client can use it | |
function read_query_result( inj ) | |
local is_debug = proxy.global.config.rwsplit.is_debug | |
local res = assert(inj.resultset) | |
local flags = res.flags | |
if inj.id ~= 1 then | |
-- ignore the result of the USE <default_db> | |
-- the DB might not exist on the backend, what do do ? | |
-- | |
if inj.id == 2 then | |
-- the injected INIT_DB failed as the slave doesn't have this DB | |
-- or doesn't have permissions to read from it | |
if res.query_status == proxy.MYSQLD_PACKET_ERR then | |
proxy.queries:reset() | |
proxy.response = { | |
type = proxy.MYSQLD_PACKET_ERR, | |
errmsg = "can't change DB ".. proxy.connection.client.default_db .. | |
" to on slave " .. proxy.global.backends[proxy.connection.backend_ndx].dst.name | |
} | |
return proxy.PROXY_SEND_RESULT | |
end | |
end | |
return proxy.PROXY_IGNORE_RESULT | |
end | |
is_in_transaction = flags.in_trans | |
local have_last_insert_id = (res.insert_id and (res.insert_id > 0)) | |
if not is_in_transaction and | |
not is_in_select_calc_found_rows and | |
not have_last_insert_id then | |
-- release the backend | |
proxy.connection.backend_ndx = 0 | |
elseif is_debug then | |
print("(read_query_result) staying on the same backend") | |
print(" in_trans : " .. tostring(is_in_transaction)) | |
print(" in_calc_found : " .. tostring(is_in_select_calc_found_rows)) | |
print(" have_insert_id : " .. tostring(have_last_insert_id)) | |
end | |
end | |
--- | |
-- close the connections if we have enough connections in the pool | |
-- | |
-- @return nil - close connection | |
-- IGNORE_RESULT - store connection in the pool | |
function disconnect_client() | |
local is_debug = proxy.global.config.rwsplit.is_debug | |
if is_debug then | |
print("[disconnect_client] " .. proxy.connection.client.src.name) | |
end | |
-- make sure we are disconnection from the connection | |
-- to move the connection into the pool | |
proxy.connection.backend_ndx = 0 | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment