Skip to content

Instantly share code, notes, and snippets.

@mgttt
Forked from maxclark/admin.lua
Created October 18, 2020 03:00
Show Gist options
  • Save mgttt/f61410f009a0a110ebacfbc02848b837 to your computer and use it in GitHub Desktop.
Save mgttt/f61410f009a0a110ebacfbc02848b837 to your computer and use it in GitHub Desktop.
The illusive read/write splitting lua for the mysql-proxy. Includes connection pooling and read load balancing.
--[[
Copyright 2008, 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
--]]
-- admin.lua
--[[
See http://www.chriscalender.com/?p=41
(Thanks to Chris Calender)
See http://datacharmer.blogspot.com/2009/01/mysql-proxy-is-back.html
(Thanks Giuseppe Maxia)
--]]
function set_error(errmsg)
proxy.response = {
type = proxy.MYSQLD_PACKET_ERR,
errmsg = errmsg or "error"
}
end
function read_query(packet)
if packet:byte() ~= proxy.COM_QUERY then
set_error("[admin] we only handle text-based queries (COM_QUERY)")
return proxy.PROXY_SEND_RESULT
end
local query = packet:sub(2)
local rows = { }
local fields = { }
-- try to match the string up to the first non-alphanum
local f_s, f_e, command = string.find(packet, "^%s*(%w+)", 2)
local option
if f_e then
-- if that match, take the next sub-string as option
f_s, f_e, option = string.find(packet, "^%s+(%w+)", f_e + 1)
end
-- we got our commands, execute it
if command == "show" and option == "querycounter" then
---
-- proxy.PROXY_SEND_RESULT requires
--
-- proxy.response.type to be either
-- * proxy.MYSQLD_PACKET_OK or
-- * proxy.MYSQLD_PACKET_ERR
--
-- for proxy.MYSQLD_PACKET_OK you need a resultset
-- * fields
-- * rows
--
-- for proxy.MYSQLD_PACKET_ERR
-- * errmsg
proxy.response.type = proxy.MYSQLD_PACKET_OK
proxy.response.resultset = {
fields = {
{ type = proxy.MYSQL_TYPE_LONG, name = "query_counter", },
},
rows = {
{ proxy.global.query_counter }
}
}
-- we have our result, send it back
return proxy.PROXY_SEND_RESULT
elseif command == "show" and option == "myerror" then
proxy.response.type = proxy.MYSQLD_PACKET_ERR
proxy.response.errmsg = "my first error"
return proxy.PROXY_SEND_RESULT
elseif string.sub(packet, 2):lower() == 'select help' then
return show_process_help()
elseif string.sub(packet, 2):lower() == 'show proxy processlist' then
return show_process_table()
elseif query == "SELECT * FROM backends" then
fields = {
{ name = "backend_ndx",
type = proxy.MYSQL_TYPE_LONG },
{ name = "address",
type = proxy.MYSQL_TYPE_STRING },
{ name = "state",
type = proxy.MYSQL_TYPE_STRING },
{ name = "type",
type = proxy.MYSQL_TYPE_STRING },
}
for i = 1, #proxy.global.backends do
local b = proxy.global.backends[i]
rows[#rows + 1] = {
i, b.dst.name, b.state, b.type
}
end
else
set_error()
return proxy.PROXY_SEND_RESULT
end
proxy.response = {
type = proxy.MYSQLD_PACKET_OK,
resultset = {
fields = fields,
rows = rows
}
}
return proxy.PROXY_SEND_RESULT
end
function make_dataset (header, dataset)
proxy.response.type = proxy.MYSQLD_PACKET_OK
proxy.response.resultset = {
fields = {},
rows = {}
}
for i,v in pairs (header) do
table.insert(proxy.response.resultset.fields, {type = proxy.MYSQL_TYPE_STRING, name = v})
end
for i,v in pairs (dataset) do
table.insert(proxy.response.resultset.rows, v )
end
return proxy.PROXY_SEND_RESULT
end
function show_process_table()
local dataset = {}
local header = { 'Id', 'IP Address', 'Time' }
local rows = {}
for t_i, t_v in pairs (proxy.global.process) do
for s_i, s_v in pairs ( t_v ) do
table.insert(rows, { t_i, s_v.ip, os.date('%c',s_v.ts) })
end
end
return make_dataset(header,rows)
end
function show_process_help()
local dataset = {}
local header = { 'command', 'description' }
local rows = {
{'SELECT HELP', 'This command.'},
{'SHOW PROXY PROCESSLIST', 'Show all connections and their true IP Address.'},
}
return make_dataset(header,rows)
end
function dump_process_table()
proxy.global.initialize_process_table()
print('current contents of process table')
for t_i, t_v in pairs (proxy.global.process) do
print ('session id: ', t_i)
for s_i, s_v in pairs ( t_v ) do
print ( '\t', s_i, s_v.ip, s_v.ts )
end
end
print ('---END PROCESS TABLE---')
end
--[[ Help
we use a simple string-match to split commands are word-boundaries
mysql> show querycounter
is split into
command = "show"
option = "querycounter"
spaces are ignored, the case has to be as is.
mysql> show myerror
returns a error-packet
--]]
[mysql-proxy]
user_name=mysql-proxy
admin-address = 127.0.0.1:3308
proxy-address = 127.0.0.1:3306
proxy-skip-profiling = true
daemon = 1
pid-file = /var/run/mysql-proxy.pid
log-file = /var/log/mysql-proxy.log
log-level = debug
proxy-backend-addresses = master:3306
proxy-read-only-backend-addresses = slave01:3306,slave02:3306
keepalive=1
admin-username=root
admin-password=secretpassword
admin-lua-script=/usr/lib64/mysql-proxy/lua/admin.lua
#proxy-lua-script=/usr/lib64/mysql-proxy/lua/reporter.lua
proxy-lua-script=/usr/lib64/mysql-proxy/lua/rw-splitting.lua
plugin_dir=/usr/lib64/mysql-proxy/plugins
plugins=proxy,admin
is_debug = false
--[[
Copyright 2008, 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
--]]
-- reporter.lua
--[[
See http://www.chriscalender.com/?p=41
(Thanks to Chris Calender)
See http://datacharmer.blogspot.com/2009/01/mysql-proxy-is-back.html
(Thanks Giuseppe Maxia)
--]]
proxy.global.query_counter = proxy.global.query_counter or 0
function proxy.global.initialize_process_table()
if proxy.global.process == nil then
proxy.global.process = {}
end
if proxy.global.process[proxy.connection.server.thread_id] == nil then
proxy.global.process[proxy.connection.server.thread_id] = {}
end
end
function read_auth_result( auth )
local state = auth.packet:byte()
if state == proxy.MYSQLD_PACKET_OK then
proxy.global.initialize_process_table()
table.insert( proxy.global.process[proxy.connection.server.thread_id],
{ ip = proxy.connection.client.src.name, ts = os.time() } )
end
end
function disconnect_client()
local connection_id = proxy.connection.server.thread_id
if connection_id then
-- client has disconnected, set this to nil
proxy.global.process[connection_id] = nil
end
end
---
-- read_query() can return a resultset
--
-- You can use read_query() to return a result-set.
--
-- @param packet the mysql-packet sent by the client
--
-- @return
-- * nothing to pass on the packet as is,
-- * proxy.PROXY_SEND_QUERY to send the queries from the proxy.queries queue
-- * proxy.PROXY_SEND_RESULT to send your own result-set
--
function read_query( packet )
-- a new query came in in this connection
-- using proxy.global.* to make it available to the admin plugin
proxy.global.query_counter = proxy.global.query_counter + 1
end
--[[ $%BEGINLICENSE%$
Copyright (c) 2007, 2009, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; version 2 of the
License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
$%ENDLICENSE%$ --]]
---
-- a flexible statement based load balancer with connection pooling
--
-- * build a connection pool of min_idle_connections for each backend and maintain
-- its size
-- *
--
--
local commands = require("proxy.commands")
local tokenizer = require("proxy.tokenizer")
local lb = require("proxy.balance")
local auto_config = require("proxy.auto-config")
--- config
--
-- connection pool
if not proxy.global.config.rwsplit then
proxy.global.config.rwsplit = {
min_idle_connections = 4,
max_idle_connections = 8,
is_debug = false
}
end
---
-- read/write splitting sends all non-transactional SELECTs to the slaves
--
-- is_in_transaction tracks the state of the transactions
local is_in_transaction = false
-- if this was a SELECT SQL_CALC_FOUND_ROWS ... stay on the same connections
local is_in_select_calc_found_rows = false
---
-- get a connection to a backend
--
-- as long as we don't have enough connections in the pool, create new connections
--
function connect_server()
local is_debug = proxy.global.config.rwsplit.is_debug
-- make sure that we connect to each backend at least ones to
-- keep the connections to the servers alive
--
-- on read_query we can switch the backends again to another backend
if is_debug then
print()
print("[connect_server] " .. proxy.connection.client.src.name)
end
local rw_ndx = 0
-- init all backends
for i = 1, #proxy.global.backends do
local s = proxy.global.backends[i]
local pool = s.pool -- we don't have a username yet, try to find a connections which is idling
local cur_idle = pool.users[""].cur_idle_connections
pool.min_idle_connections = proxy.global.config.rwsplit.min_idle_connections
pool.max_idle_connections = proxy.global.config.rwsplit.max_idle_connections
if is_debug then
print(" [".. i .."].connected_clients = " .. s.connected_clients)
print(" [".. i .."].pool.cur_idle = " .. cur_idle)
print(" [".. i .."].pool.max_idle = " .. pool.max_idle_connections)
print(" [".. i .."].pool.min_idle = " .. pool.min_idle_connections)
print(" [".. i .."].type = " .. s.type)
print(" [".. i .."].state = " .. s.state)
end
-- prefer connections to the master
if s.type == proxy.BACKEND_TYPE_RW and
s.state ~= proxy.BACKEND_STATE_DOWN and
cur_idle < pool.min_idle_connections then
proxy.connection.backend_ndx = i
break
elseif s.type == proxy.BACKEND_TYPE_RO and
s.state ~= proxy.BACKEND_STATE_DOWN and
cur_idle < pool.min_idle_connections then
proxy.connection.backend_ndx = i
break
elseif s.type == proxy.BACKEND_TYPE_RW and
s.state ~= proxy.BACKEND_STATE_DOWN and
rw_ndx == 0 then
rw_ndx = i
end
end
if proxy.connection.backend_ndx == 0 then
if is_debug then
print(" [" .. rw_ndx .. "] taking master as default")
end
proxy.connection.backend_ndx = rw_ndx
end
-- pick a random backend
--
-- we someone have to skip DOWN backends
-- ok, did we got a backend ?
if proxy.connection.server then
if is_debug then
print(" using pooled connection from: " .. proxy.connection.backend_ndx)
end
-- stay with it
return proxy.PROXY_IGNORE_RESULT
end
if is_debug then
print(" [" .. proxy.connection.backend_ndx .. "] idle-conns below min-idle")
end
-- open a new connection
end
---
-- put the successfully authed connection into the connection pool
--
-- @param auth the context information for the auth
--
-- auth.packet is the packet
function read_auth_result( auth )
if is_debug then
print("[read_auth_result] " .. proxy.connection.client.src.name)
end
if auth.packet:byte() == proxy.MYSQLD_PACKET_OK then
-- auth was fine, disconnect from the server
proxy.connection.backend_ndx = 0
elseif auth.packet:byte() == proxy.MYSQLD_PACKET_EOF then
-- we received either a
--
-- * MYSQLD_PACKET_ERR and the auth failed or
-- * MYSQLD_PACKET_EOF which means a OLD PASSWORD (4.0) was sent
print("(read_auth_result) ... not ok yet");
elseif auth.packet:byte() == proxy.MYSQLD_PACKET_ERR then
-- auth failed
end
end
---
-- read/write splitting
function read_query( packet )
local is_debug = proxy.global.config.rwsplit.is_debug
local cmd = commands.parse(packet)
local c = proxy.connection.client
local r = auto_config.handle(cmd)
if r then return r end
local tokens
local norm_query
-- looks like we have to forward this statement to a backend
if is_debug then
print("[read_query] " .. proxy.connection.client.src.name)
print(" current backend = " .. proxy.connection.backend_ndx)
print(" client default db = " .. c.default_db)
print(" client username = " .. c.username)
if cmd.type == proxy.COM_QUERY then
print(" query = " .. cmd.query)
end
end
if cmd.type == proxy.COM_QUIT then
-- don't send COM_QUIT to the backend. We manage the connection
-- in all aspects.
proxy.response = {
type = proxy.MYSQLD_PACKET_OK,
}
if is_debug then
print(" (QUIT) current backend = " .. proxy.connection.backend_ndx)
end
return proxy.PROXY_SEND_RESULT
end
-- COM_BINLOG_DUMP packet can't be balanced
--
-- so we must send it always to the master
if cmd.type == proxy.COM_BINLOG_DUMP then
-- if we don't have a backend selected, let's pick the master
--
if proxy.connection.backend_ndx == 0 then
proxy.connection.backend_ndx = lb.idle_failsafe_rw()
end
return
end
proxy.queries:append(1, packet, { resultset_is_needed = true })
-- read/write splitting
--
-- send all non-transactional SELECTs to a slave
if not is_in_transaction and
cmd.type == proxy.COM_QUERY then
tokens = tokens or assert(tokenizer.tokenize(cmd.query))
local stmt = tokenizer.first_stmt_token(tokens)
if stmt.token_name == "TK_SQL_SELECT" then
is_in_select_calc_found_rows = false
local is_insert_id = false
for i = 1, #tokens do
local token = tokens[i]
-- SQL_CALC_FOUND_ROWS + FOUND_ROWS() have to be executed
-- on the same connection
-- print("token: " .. token.token_name)
-- print(" val: " .. token.text)
if not is_in_select_calc_found_rows and token.token_name == "TK_SQL_SQL_CALC_FOUND_ROWS" then
is_in_select_calc_found_rows = true
elseif not is_insert_id and token.token_name == "TK_LITERAL" then
local utext = token.text:upper()
if utext == "LAST_INSERT_ID" or
utext == "@@INSERT_ID" then
is_insert_id = true
end
end
-- we found the two special token, we can't find more
if is_insert_id and is_in_select_calc_found_rows then
break
end
end
-- if we ask for the last-insert-id we have to ask it on the original
-- connection
if not is_insert_id then
local backend_ndx = lb.idle_ro()
if backend_ndx > 0 then
proxy.connection.backend_ndx = backend_ndx
end
else
print(" found a SELECT LAST_INSERT_ID(), staying on the same backend")
end
end
end
-- no backend selected yet, pick a master
if proxy.connection.backend_ndx == 0 then
-- we don't have a backend right now
--
-- let's pick a master as a good default
--
proxy.connection.backend_ndx = lb.idle_failsafe_rw()
end
-- by now we should have a backend
--
-- in case the master is down, we have to close the client connections
-- otherwise we can go on
if proxy.connection.backend_ndx == 0 then
return proxy.PROXY_SEND_QUERY
end
local s = proxy.connection.server
-- if client and server db don't match, adjust the server-side
--
-- skip it if we send a INIT_DB anyway
if cmd.type ~= proxy.COM_INIT_DB and
c.default_db and c.default_db ~= s.default_db then
print(" server default db: " .. s.default_db)
print(" client default db: " .. c.default_db)
print(" syncronizing")
proxy.queries:prepend(2, string.char(proxy.COM_INIT_DB) .. c.default_db, { resultset_is_needed = true })
end
-- send to master
if is_debug then
if proxy.connection.backend_ndx > 0 then
local b = proxy.global.backends[proxy.connection.backend_ndx]
print(" sending to backend : " .. b.dst.name);
print(" is_slave : " .. tostring(b.type == proxy.BACKEND_TYPE_RO));
print(" server default db: " .. s.default_db)
print(" server username : " .. s.username)
end
print(" in_trans : " .. tostring(is_in_transaction))
print(" in_calc_found : " .. tostring(is_in_select_calc_found_rows))
print(" COM_QUERY : " .. tostring(cmd.type == proxy.COM_QUERY))
end
return proxy.PROXY_SEND_QUERY
end
---
-- as long as we are in a transaction keep the connection
-- otherwise release it so another client can use it
function read_query_result( inj )
local is_debug = proxy.global.config.rwsplit.is_debug
local res = assert(inj.resultset)
local flags = res.flags
if inj.id ~= 1 then
-- ignore the result of the USE <default_db>
-- the DB might not exist on the backend, what do do ?
--
if inj.id == 2 then
-- the injected INIT_DB failed as the slave doesn't have this DB
-- or doesn't have permissions to read from it
if res.query_status == proxy.MYSQLD_PACKET_ERR then
proxy.queries:reset()
proxy.response = {
type = proxy.MYSQLD_PACKET_ERR,
errmsg = "can't change DB ".. proxy.connection.client.default_db ..
" to on slave " .. proxy.global.backends[proxy.connection.backend_ndx].dst.name
}
return proxy.PROXY_SEND_RESULT
end
end
return proxy.PROXY_IGNORE_RESULT
end
is_in_transaction = flags.in_trans
local have_last_insert_id = (res.insert_id and (res.insert_id > 0))
if not is_in_transaction and
not is_in_select_calc_found_rows and
not have_last_insert_id then
-- release the backend
proxy.connection.backend_ndx = 0
elseif is_debug then
print("(read_query_result) staying on the same backend")
print(" in_trans : " .. tostring(is_in_transaction))
print(" in_calc_found : " .. tostring(is_in_select_calc_found_rows))
print(" have_insert_id : " .. tostring(have_last_insert_id))
end
end
---
-- close the connections if we have enough connections in the pool
--
-- @return nil - close connection
-- IGNORE_RESULT - store connection in the pool
function disconnect_client()
local is_debug = proxy.global.config.rwsplit.is_debug
if is_debug then
print("[disconnect_client] " .. proxy.connection.client.src.name)
end
-- make sure we are disconnection from the connection
-- to move the connection into the pool
proxy.connection.backend_ndx = 0
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment