Skip to content

Instantly share code, notes, and snippets.

@shankerwangmiao
Last active December 6, 2023 13:54
Show Gist options
  • Save shankerwangmiao/ffe48d51eef5b7178a442ba4c32568a2 to your computer and use it in GitHub Desktop.
Save shankerwangmiao/ffe48d51eef5b7178a442ba4c32568a2 to your computer and use it in GitHub Desktop.
Rsync proxy for nginx
user nobody nogroup;
worker_processes 1;
pid nginx.pid;
error_log error.log debug;
#error_log error.log;
load_module modules/ngx_stream_module.so;
load_module modnjs/ngx_stream_js_module.so;
events {
worker_connections 768;
# multi_accept on;
}
stream {
js_import rsync from rsync.njs;
js_set $rsync_neg_ver rsync.negVer;
js_set $rsync_cli_ver rsync.cliVer;
js_set $rsync_module rsync.moduleName;
log_format rsync_main '$remote_addr - - [$time_local] "$rsync_module" '
'$rsync_backend $status $bytes_received $bytes_sent $session_time '
'rsync/$rsync_cli_ver rsyncd/$rsync_neg_ver';
server {
listen 12345;
js_preread rsync.preread;
js_filter rsync.filter;
access_log rsync.log rsync_main;
proxy_pass $rsync_backend;
}
map $rsync_module $rsync_backend {
foo self;
debian bfsu;
default nano;
}
upstream self {
server 127.0.0.1:12346;
}
upstream bfsu {
server mirrors.bfsu.edu.cn:873;
}
upstream nano {
server mirrors.tuna.tsinghua.edu.cn:873;
}
}
Rsync proxy for nginx
function protoHandshake(a, b) {
var proto, sub;
if (a.proto > b.proto) {
proto = b.proto;
sub = 0;
if (b.sub) {
proto -= 1;
}
} else if (a.proto == b.proto) {
proto = a.proto;
if (a.sub != b.sub) {
proto -= 1;
sub = 0;
} else {
sub = a.sub;
}
} else {
proto = a.proto;
sub = 0;
if (a.sub) {
proto -= 1;
}
}
return {proto, sub};
}
function read_line_old(buf, startPos){
var nlPos = buf.indexOf("\n", startPos);
if (nlPos == -1) {
return {pos: 0};
}
var line = buf.slice(startPos, nlPos);
var resultBufPos = 0;
for (var i = 0; i < line.length; i++) {
if (line[i] == 0){
return {readPos: -1};
} else if (line[i] != 0x0d /* '\r' */) {
line[resultBufPos++] = line[i];
}
}
return { pos: nlPos + 1, buf: line.slice(0, resultBufPos) };
}
function eatWhite(buf, pos){
while (pos < buf.length && (buf[pos] == 0x20 || (buf[pos] >= 0x00 && buf[pos] <= 0x0d))) {
pos++;
}
return pos;
}
function readInt(buf, pos){
var num, sign = 1;
pos = eatWhite(buf, pos);
if(buf[pos] == 0x2b /* '+' */ || buf[pos] == 0x2d /* - */){
if(buf[pos] == 0x2d /* - */){
sign = -1;
}
pos++;
}
while (pos < buf.length && buf[pos] <= 0x39 /* '9' */ && buf[pos] >= 0x30 /* '0' */) {
if (!num) {
num = 0;
}
num *= 10;
num += buf[pos] - 0x30 /* '0' */;
pos ++;
}
if(num !== undefined){
num *= sign;
} else {
pos -= 1;
}
return {pos, num};
}
function parseHSVer(buf){
var proto, sub = -1;
var pos = 0;
const header = Buffer.from("@RSYNCD:");
if (header.length <= buf.length && buf.compare(header, 0, header.length, pos, header.length) == 0){
pos += header.length;
pos = eatWhite(buf, pos);
var t = readInt(buf, pos);
pos = t.pos;
proto = t.num;
if (proto !== undefined) {
if (buf[pos++] == 0x2e /* '.' */){
var t = readInt(buf, pos);
pos = t.pos;
sub = t.num;
if ( sub === undefined ){
sub = -1;
}
}
}
}
return {proto, sub};
}
/*
console.log(eatWhite(Buffer.from("a"), 0));
console.log(eatWhite(Buffer.from(" "), 0));
console.log(eatWhite(Buffer.from(" "), 0));
console.log(eatWhite(Buffer.from(" a"), 0));
console.log(readInt(Buffer.from("12 a"), 0));
console.log(readInt(Buffer.from("12a"), 0));
console.log(readInt(Buffer.from(" + 12"), 0));
console.log(readInt(Buffer.from(" +12a"), 0));
console.log(readInt(Buffer.from(" -12a"), 0));
console.log(readInt(Buffer.from(" -a"), 0));
console.log(parseHSVer(Buffer.from("@RSYNCD: 1.2")));
console.log(parseHSVer(Buffer.from("@RSYNCD: -1.2")));
console.log(parseHSVer(Buffer.from("@RSYNCD: \t-1. 2")));
console.log(parseHSVer(Buffer.from("@RSYNCD: \t-1 .2")));
console.log(parseHSVer(Buffer.from("@RSYNCD: +1. -2")));
console.log(parseHSVer(Buffer.from("@RSYNCD: + 1. -2")));
console.log(parseHSVer(Buffer.from("@RSYNCD:-1. -2")));
console.log(parseHSVer(Buffer.from("@RSY")));
console.log(read_line_old(Buffer.from("aaaa")));
console.log(read_line_old(Buffer.from("aaaa\nbbbbb\n")));
console.log(read_line_old(Buffer.from("a\ta\raa\r\nbbbbb\n")));
*/
const PROTO_VER = 31;
var protoVer;
var clientVer;
var _moduleName = "";
var hsSent = 0;
const INIT = 0;
const VER_RECV = 1;
const MODULE_RECV = 2;
var state = INIT;
function preread(s){
var readPos = 0;
if(!hsSent){
s.send("@RSYNCD: " + PROTO_VER + ".0\n");
hsSent = 1;
}
s.on('upstream', function(data, flags) {
while(1){
if(state == INIT || state == VER_RECV) {
var t = read_line_old(data, readPos);
if( t.pos != 0 ){
readPos = t.pos;
} else {
break;
}
if(state == INIT){
clientVer = parseHSVer(t.buf);
if(clientVer.proto === undefined){
s.send("@ERROR: protocol startup error\n");
return s.deny();
}
if(clientVer.sub < 0){
if(clientVer.proto == 30){
s.send("@ERROR: your client is speaking an incompatible beta of protocol 30\n");
return s.deny();
}
clientVer.sub = 0;
}
var hsv = protoHandshake(clientVer, {proto: PROTO_VER, sub: 0});
protoVer = hsv.proto;
state = VER_RECV;
} else { // state == VER_RECV
_moduleName = t.buf.toString();
state = MODULE_RECV;
}
} else { // state == MODULE_RECV
return s.done();
}
}
});
}
const PROXY_V1_HEADER = Buffer.from('PROXY ');
const PROXY_V1_TRAIL = Buffer.from([0x0d, 0x0a]);
const PROXY_V2_HEADER = Buffer.from([0x0d, 0x0a, 0x0d, 0x0a, 0x00, 0x0d]);
function filter(s){
var uploadBuf = Buffer.alloc(0);
var downloadBuf = Buffer.alloc(0);
var proxyProtoVer = 0;
s.on('upstream', function(data, flags) {
uploadBuf = Buffer.concat([uploadBuf, data]);
if(proxyProtoVer == 0){
if(uploadBuf.length < 6){
return;
}
var header = uploadBuf.slice(0, 6);
if(header.equals(PROXY_V1_HEADER)){
proxyProtoVer = 1;
}else if(header.equals(PROXY_V2_HEADER)){
proxyProtoVer = 2;
}else{
proxyProtoVer = -1;
}
}
if(proxyProtoVer == 1){
var endPos = uploadBuf.indexOf(PROXY_V1_TRAIL);
if(endPos == -1){
return;
}
s.send(uploadBuf.slice(0, endPos + 2));
uploadBuf = uploadBuf.slice(endPos + 2);
proxyProtoVer = -1;
}
if(proxyProtoVer == 2){
if(uploadBuf.length < 16){
return;
}
var len = uploadBuf.readInt16BE(14);
if(uploadBuf.length < 16 + len){
return;
}
s.send(uploadBuf.slice(0, 16 + len));
uploadBuf = uploadBuf.slice(16 + len);
proxyProtoVer = -1;
}
var t = read_line_old(uploadBuf, 0);
if( t.pos == 0){
return;
}
s.send("@RSYNCD: " + protoVer + (protoVer < 30 ? "" : ".0") + "\n");
s.send(uploadBuf.slice(t.pos), {last: flags.last});
s.off('upstream');
});
s.on('downstream', function(data, flags) {
downloadBuf = Buffer.concat([downloadBuf, data]);
var t = read_line_old(downloadBuf, 0);
if( t.pos == 0){
return;
}
s.send(downloadBuf.slice(t.pos), {last: flags.last});
s.off('downstream');
});
}
function moduleName(s) {
return _moduleName;
}
function negVer(s) {
if (protoVer === undefined){
return "-";
}else{
return protoVer + (protoVer < 30 ? "" : ".0");
}
}
function cliVer(s) {
if (clientVer === undefined){
return "-";
}else{
return clientVer.proto + "." + clientVer.sub;
}
}
export default {preread, filter, moduleName, negVer, cliVer};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment