|
local ngx = require("ngx") |
|
local lhp = require("http.parser") |
|
local M = {} |
|
|
|
|
|
local function log(level, t, ...) |
|
local m = string.format(t, ...) |
|
ngx.log(level, m) |
|
return m |
|
end |
|
|
|
|
|
M.RequestWatcher = {} |
|
function M.RequestWatcher.new(self, url) |
|
local o = { |
|
url = url, |
|
end_of_sequence = false, |
|
} |
|
setmetatable(o, self) |
|
self.__index = self |
|
o.parser = lhp.request({ |
|
on_url=function(...) o:on_url(...) end, |
|
}) |
|
return o |
|
end |
|
function M.RequestWatcher.on_url(self, url) |
|
log(ngx.DEBUG, "request watcher: check url") |
|
if self.url ~= url then |
|
log(ngx.DEBUG, "request watcher: url mismatch, expect '%s', but got '%s'", self.url, url) |
|
self.end_of_sequence = true |
|
end |
|
end |
|
function M.RequestWatcher.execute(self, bytes) |
|
return self.parser:execute(bytes) |
|
end |
|
|
|
|
|
M.ResponseWatcher = {} |
|
function M.ResponseWatcher.new(self) |
|
local o = { |
|
non_auth_code = false, |
|
end_of_sequence = false, |
|
} |
|
setmetatable(o, self) |
|
self.__index = self |
|
o.parser = lhp.response({ |
|
on_status=function(...) o:on_status(...) end, |
|
on_message_complete=function(...) o:on_message_complete(...) end, |
|
}) |
|
return o |
|
end |
|
function M.ResponseWatcher.on_message_complete(self) |
|
log(ngx.DEBUG, "response watcher: message complete") |
|
if self.non_auth_code then |
|
self.end_of_sequence = true |
|
end |
|
end |
|
function M.ResponseWatcher.on_status(self, code, text) |
|
log(ngx.DEBUG, "response watcher: %s %s", code, text) |
|
if code ~= 401 then |
|
self.non_auth_code = true |
|
end |
|
end |
|
function M.ResponseWatcher.execute(self, bytes) |
|
return self.parser:execute(bytes) |
|
end |
|
|
|
|
|
local function transfer(source, destination, prefix, watcher) |
|
while true do |
|
local data, receive_err, partial = source:receive(8192) |
|
if receive_err and receive_err ~= "timeout" then |
|
log(ngx.ERR, "transfer[%s]: read error %s", prefix, receive_err) |
|
end |
|
for i, d in pairs({data, partial}) do |
|
if #d ~= 0 then |
|
log(ngx.DEBUG, "transfer[%s]: sending %d bytes", prefix, #d) |
|
local bytes, send_err = destination:send(d) |
|
if send_err then |
|
log(ngx.ERR, "transfer[%s]: write error %s", prefix, send_err) |
|
return receive_err, send_err |
|
else |
|
log(ngx.DEBUG, "transfer[%s]: %d bytes sent", prefix, bytes) |
|
end |
|
if watcher then |
|
watcher:execute(d) |
|
if watcher.end_of_sequence then |
|
return |
|
end |
|
end |
|
end |
|
end |
|
if receive_err and receive_err ~= "timeout" then |
|
return receive_err, send_err |
|
end |
|
end |
|
end |
|
|
|
|
|
function M.passthrough(host, port, timeout, location_prefix_to_strip) |
|
log(ngx.INFO, "passthrough[%s:%s]: started", host, port) |
|
local url = ngx.var.uri |
|
local req_sock, err = ngx.req.socket(true) |
|
if err then |
|
return log(ngx.ERR, "get req_sock error %s", err) |
|
end |
|
if timeout then |
|
req_sock:settimeouts(10000, 10000, timeout) |
|
end |
|
local resp_sock = ngx.socket.tcp() |
|
local ok, err = resp_sock:connect(host, port) |
|
if not ok then |
|
return log(ngx.ERR, "connect to %s:%s failed cause %s", host, port, err) |
|
end |
|
if timeout then |
|
resp_sock:settimeouts(10000, 10000, timeout) |
|
end |
|
local headers = ngx.req.raw_header() |
|
if location_prefix_to_strip then |
|
headers = string.gsub(ngx.req.raw_header(), location_prefix_to_strip, "", 1) |
|
end |
|
local bytes, err = resp_sock:send(headers) |
|
if err then |
|
return log(ngx.ERR, "send to %s:%s failed cause %s", host, port, err) |
|
end |
|
local request_watcher = M.RequestWatcher:new(url) |
|
request_watcher:execute(headers) |
|
local request_coroutine = ngx.thread.spawn(transfer, req_sock, resp_sock, "request", request_watcher) |
|
local response_coroutine = ngx.thread.spawn(transfer, resp_sock, req_sock, "response", M.ResponseWatcher:new()) |
|
ngx.thread.wait(request_coroutine, response_coroutine) |
|
ngx.thread.kill(request_coroutine) |
|
ngx.thread.kill(response_coroutine) |
|
resp_sock:close() |
|
log(ngx.INFO, "passthrough[%s:%s]: done", host, port) |
|
end |
|
|
|
return M |
Pfff too much info is there any step by step install how to implement this? Examples with details what to adjust on needs as example upstream nginx config?