Last active
January 24, 2020 23:54
-
-
Save LNBIG-COM/90f143c8ba61b848eb33dc4253a90425 to your computer and use it in GitHub Desktop.
Rebalance script through outside network (not all modules but the core code). With russian-languiage comments
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2019 LNBIG.com | |
* All rights reserved. | |
*/ | |
const PromisePool = require('es6-promise-pool') | |
const util = require('util'); | |
const nodeStorage = require('../global/nodeStorage'); | |
const {Mutex} = require('await-semaphore') | |
const debug = require('debug')('rebalance') | |
const debugPay = debug.extend('pay') | |
const { myNodes } = require('../global/myNodes') | |
const { | |
MIN_LOCAL_PART, | |
MAX_LOCAL_PART, | |
MINIMUM_FOR_BALANCE, | |
} = require('./constants') | |
const _ = require('lodash') | |
const pTimeout = require('p-timeout') | |
const listChannels = require('../lib/listChannels') | |
const getInfo = require('../lib/getInfo') | |
const AMOUNT_ROUNDS = 5 | |
const ROUND_FACTOR = 1.5 | |
const SEND_PAYMENT_OK = 1 | |
const SEND_PAYMENT_AGAIN = 2 | |
const SEND_PAYMENT_STOP = 3 | |
const SEND_PAYMENT_STOP_TIMEOUT = 4 | |
const SEND_PAYMENT_NO_ROUTE = 5 | |
// По 4 платежа в потоке | |
const SEND_PAYMENT_CONCURRENCY = 4 | |
const INVOICE_CLTV = 10 | |
const SEND_TIMEOUT = 20000 /* milliseconds*/ | |
const TYPE_RECEIVER = 1 | |
const TYPE_SENDER = 2 | |
// Баним узел, если это или более количества фейлов | |
const BAN_NODE_AFTER_N_FAILS = 3 | |
// Баним локальный канал, если столько неудачных SEND_PAYMENT_NO_ROUTE ошибок | |
const BAN_LOCAL_CHAN_ID_AFTER_N_FAILS = 3 | |
//const waitNextTick = util.promisify(setImmediate) | |
const waitSomeTime = util.promisify(setTimeout) | |
/* | |
* Используется для всех типов каналов - наших и чужих | |
* */ | |
class Edge { | |
constructor ({chanId, capacity = 0, grpcClient = null}) { | |
this.chanId = chanId | |
this.capacity = +capacity | |
this.grpcClient = grpcClient | |
this.node1Pubkey = this.node2Pubkey = null | |
this.node1Policy = this.node2Policy = null | |
this.isPolicy = false | |
} | |
async policies() { | |
if (! this.grpcClient) | |
throw new Error('Edge::policies - не определён grpcClient') | |
let res = await this.grpcClient.getChanInfo({chan_id: this.chanId}) | |
this.node1Pubkey = res.node1_pub | |
this.node2Pubkey = res.node2_pub | |
this.node1Policy = res.node1_policy | |
this.node2Policy = res.node2_policy | |
this.capacity = +res.capacity | |
this.isPolicy = true | |
} | |
policy(pubKey) { | |
if (this.node1Pubkey === pubKey) | |
return this.node1Policy | |
else if (this.node2Pubkey === pubKey) | |
return this.node2Policy | |
else | |
throw new Error(`Edge::policy - такого не должено быть! (${pubKey}) - ${util.inspect(this)}`) | |
} | |
get node1PubkeyBin() { | |
return Buffer.from(this.node1Pubkey, 'hex') | |
} | |
get node2PubkeyBin() { | |
return Buffer.from(this.node2Pubkey, 'hex') | |
} | |
} | |
/* | |
* Используется только для внутренних каналов | |
* */ | |
class LocalEdge extends Edge { | |
constructor (key, channel) { | |
super({chanId: channel.chan_id, capacity: channel.capacity, grpcClient: nodeStorage.nodes[key].client}) | |
this.channel = channel | |
this.key = key | |
this.localPubkey = nodeStorage.nodes[key].pubKey | |
this.remotePubkey = channel.remote_pubkey | |
this.commitFee = +channel.commit_fee | |
this.cleanCapacity = this.capacity - this.commitFee | |
this.localBalance = +this.channel.local_balance | |
this.target = { min: Math.round(this.cleanCapacity * MIN_LOCAL_PART), max: Math.round(this.cleanCapacity * MAX_LOCAL_PART) } | |
this.type = this.localBalance < this.target.min ? TYPE_RECEIVER : TYPE_SENDER | |
this.minReceive = this.target.min - this.localBalance | |
this.maxReceive = this.target.max - this.localBalance | |
this.minSend = this.localBalance - this.target.max | |
this.maxSend = this.localBalance - this.target.min | |
/* Канал считается разбалансированным, когда границы localBalance значительно выходят за пределы сбалансированного канала | |
* (чтобы понапрасну не гонять туда сюда средства между каналами "на грани") */ | |
this.unbalanced | |
= this.localBalance < (this.cleanCapacity * MIN_LOCAL_PART * 0.5) | |
|| this.localBalance > (this.cleanCapacity * (MAX_LOCAL_PART / 2 + 1 / 2) ) | |
this.localPolicy = this.remotePolicy = null | |
} | |
async policies() { | |
await super.policies() | |
if (this.node1Pubkey === this.channel.remote_pubkey) { | |
this.localPolicy = this.node2Policy | |
this.remotePolicy = this.node1Policy | |
} | |
else { | |
this.localPolicy = this.node1Policy | |
this.remotePolicy = this.node2Policy | |
} | |
} | |
get localPubkeyBin() { | |
return Buffer.from(this.localPubkey, 'hex') | |
} | |
get remotePubkeyBin() { | |
return Buffer.from(this.remotePubkey, 'hex') | |
} | |
} | |
class Rebalancer { | |
constructor (options) { | |
this.options = this.constructorOptions = options | |
this.permanentIgnoredNodes = {} | |
this.permanentIgnoredLocalChanId = {} // Если receiver или sender с chanId часто получают SEND_PAYMENT_NO_ROUTE, тогда они каждый могут попасть сюда | |
this.permanentIgnoredEdges = {} // Ключи - "chan_id-0/1" , где 0 - если from < to по ascii (lexicographic order) упорядочиванию node id | |
this.okAmntPayments = 0 | |
this.failedAmntPayments = 0 | |
this.timedOutAmntPayments = 0 | |
this.noRouteAmntPayments = 0 | |
this.totalPaidFees = 0 | |
this.concurrency = this.options.concurrency | |
} | |
resetIgnores() { | |
/* Игнорируемые каналы, список которых определяется на основе неудачных платежей */ | |
debug('resetIgnores...') | |
this.ignoredPendingChannels = {} | |
this.ignoredPendingChannelsLocks = {} | |
this.ignoreNodes = {} | |
this.ignoredEdges = {} // Ключи - "chan_id-0/1" , где 0 - если from < to по ascii (lexicographic order) упорядочиванию node id | |
this.ignoredLocalChanId = {} | |
} | |
async addToIgnore(edge) { | |
if (edge instanceof LocalEdge) { | |
if (!this.localIgnoredEdges.has(edge)) | |
this.localIgnoredEdges.set(edge, [{from: edge.localPubkeyBin, to: edge.remotePubkeyBin}, {from: edge.remotePubkeyBin, to: edge.localPubkeyBin}]) | |
} | |
else { | |
/* В LND есть баг - есть pending каналы, которые удалённые узлы уже считают открытыми (channel point уже в блокчейн) | |
* Но сам LND, который эти каналы открыл - считает их pending, пока его не перезапустишь | |
* Но при этом запрос на поиск маршрута (QueryRoutes) включает эти каналы в маршруты, так как они есть в графе | |
* Чтобы избавиться от таких казусов, приходится вести ещё этот список */ | |
if (! this.ignoredPendingChannels[edge.chanId] && ! this.ignoredPendingChannelsLocks[edge.chanId]) { | |
this.ignoredPendingChannelsLocks[edge.chanId] = 1 | |
debug("ВНИМАНИЕ! Зависший pending канал (%s), который другими узлами транслируется в граф (lnd bug) - в игнор его!", edge.chanId) | |
try { | |
await edge.policies() | |
let item = this.ignoredPendingChannels[edge.chanId] = [{from: edge.node1PubkeyBin, to: edge.node2PubkeyBin}, {from: edge.node2PubkeyBin, to: edge.node1PubkeyBin}] | |
debug("Pending канал (%s) добавлен в игнор как: %s", edge.chanId, util.inspect(item, false, 3)) | |
} | |
finally { | |
this.ignoredPendingChannelsLocks[edge.chanId] = 0 | |
} | |
} | |
} | |
} | |
async readChannelInfo() { | |
this.listChannels = listChannels(nodeStorage, {}) | |
this.getInfo = getInfo(nodeStorage) | |
debug('Запускаются асинхронные команды...') | |
this.listChannels = await this.listChannels | |
this.getInfo = await this.getInfo | |
debug('Данные получены полностью, обработка') | |
this.localIgnoredEdges = new Map() | |
} | |
async run(options = this.constructorOptions) { | |
// The number of promises to process simultaneously. | |
this.options = options | |
await this.readChannelInfo() | |
await this.findCandidates() | |
await this.rebalancing() | |
} | |
optimalAmount(receiver, sender, tolerance) { | |
let min = Math.max(receiver.minReceive, sender.minSend) | |
let max = Math.min(receiver.maxReceive, sender.maxSend) | |
let res = null | |
if (min < max) | |
res = max | |
else if (tolerance) { | |
/* Если Rmax < Smin, но оба довольно большие - можно перекинуть Rmax | |
* Если Rmax > Smin, но оба довольно большие - можно перекинуть Smax */ | |
if (receiver.maxReceive < sender.minSend && sender.minSend / receiver.maxReceive <= tolerance) | |
res = receiver.maxReceive | |
else if (receiver.minReceive > sender.maxSend && receiver.minReceive / sender.maxSend <= tolerance) | |
res = sender.maxSend | |
} | |
debug("Rmin=%d, Rmax=%d, Smin=%d, Smax=%d, optimalAmount=%d (tolerance=%d), receiver=%s, sender=%s", receiver.minReceive, receiver.maxReceive, sender.minSend, sender.maxSend, res, tolerance, receiver.chanId, sender.chanId) | |
return res | |
} | |
/* factor теперь используется, если мы разбиваем платёж - тогда factor будет от 0 до 1 (например 1/4 при 4-х платежах)*/ | |
getMaxFeeMsats(amountSats, factor = 1) { | |
return Math.floor(amountSats * 1000 * this.options.feeRate / 1E6 + this.options.feeBase * factor ) | |
} | |
makeIgnoredNodes() { | |
let list = Object.keys( | |
Object.entries(this.ignoreNodes).reduce( | |
(acc, val) => { | |
if (val[1] >= BAN_NODE_AFTER_N_FAILS) | |
acc[val[0]] = 1 | |
return acc | |
}, | |
{...this.permanentIgnoredNodes} | |
) | |
).map( v => Buffer.from(v, 'hex')) | |
debug("makeIgnoredNodes length now is %d", list.length) | |
return list | |
} | |
mergeIgnoredNodesWithPermanentOnes() { | |
debug("mergeIgnoredNodesWithPermanentOnes before, ignoreNodes=%o", this.ignoreNodes) | |
Object.entries(this.ignoreNodes).forEach( | |
(val) => { | |
if (val[1] >= BAN_NODE_AFTER_N_FAILS) | |
this.permanentIgnoredNodes[val[0]] = 1 | |
} | |
) | |
debug("mergeIgnoredNodesWithPermanentOnes=%o", this.permanentIgnoredNodes) | |
} | |
mergeIgnoredLocalChanIdWithPermanentOnes() { | |
debug("mergeIgnoredLocalChanIdWithPermanentOnes before, ignoredLocalChanId: %o", this.ignoredLocalChanId) | |
Object.entries(this.ignoredLocalChanId).forEach( | |
(val) => { | |
if (val[1] >= BAN_LOCAL_CHAN_ID_AFTER_N_FAILS) | |
this.permanentIgnoredLocalChanId[val[0]] = 1 | |
} | |
) | |
debug("mergeIgnoredLocalChanIdWithPermanentOnes=%o", this.permanentIgnoredLocalChanId) | |
} | |
async findRoute(receiver, sender, {pathTitle, lastHopAmount, lastHopFeeMsat, maxFeeMsats, amount}) { | |
if (this.options.dryRun) | |
return null | |
// Бывает, что в ходе отправок платежей предпоследний узел попадает в игнорируемые | |
// (например он ушёл в оффлайн) - для такого перестаём искать маршруты | |
if (this.permanentIgnoredNodes[receiver.remotePubkey]) | |
return null | |
try { | |
let req = { | |
pub_key: receiver.channel.remote_pubkey, // Чтобы расчитать маршрут, используем конечный узел как предпоследний | |
amt: lastHopAmount, | |
final_cltv_delta: receiver.localPolicy.time_lock_delta + receiver.remotePolicy.time_lock_delta + 3, // +3 - это на всякий случай даю фору 2 блока... | |
fee_limit: {fixed: Math.ceil((maxFeeMsats - lastHopFeeMsat) / 1000)}, | |
ignored_pairs: this.makeIgnoredPairs(receiver, sender), | |
ignored_nodes: this.makeIgnoredNodes() | |
} | |
debug("Поиск маршрутов (%s) - делаем запрос, sats: %d", pathTitle, req.amt) | |
//console.time('Поиск маршрутов') | |
let res = await nodeStorage.nodes[sender.key].client.queryRoutes(req) | |
//console.timeEnd('Поиск маршрутов') | |
debug("Маршрут(ы) найден(ы) (%s) - %s", pathTitle, util.inspect(res,false,4)) | |
if (res.routes.length > 1) | |
debug("Найдено несколько маршрутов (%s): res: %o", pathTitle, res) | |
let route = res.routes[0] | |
if (route.hops[0].chan_id !== sender.chanId) { | |
console.warn(`Первый канал (${pathTitle}) (${route.hops[0].chan_id}) должен быть равен sender (${sender.chanId}) каналу! Пробуем снова!`) | |
let edge = new Edge({chanId: route.hops[0].chan_id, grpcClient: nodeStorage.nodes[sender.key].client}) | |
await this.addToIgnore(edge) | |
return null | |
} | |
// https://github.com/lightningnetwork/lnd/issues/3712#issuecomment-553297703 | |
// Not more 20 hops | |
if (route.hops.length > 19) | |
return null | |
route.hops.push({ | |
chan_id: receiver.chanId, | |
chan_capacity: receiver.channel.capacity, | |
amt_to_forward_msat: amount * 1000, | |
fee_msat: 0, | |
expiry: route.hops[route.hops.length - 1].expiry - receiver.remotePolicy.time_lock_delta, | |
pub_key: receiver.localPubkey | |
}) | |
await this.recalculateRoute(receiver, route, nodeStorage.nodes[sender.key].client) | |
debug("Сравнение комиссий пересчитанного маршрута (%s) (%d) и допустимых (%d)", pathTitle, route.total_fees_msat, maxFeeMsats) | |
if (route.total_fees_msat <= maxFeeMsats) { | |
debugPay("Маршрут после изменения (%s): %s", pathTitle, util.inspect(route,false,4)) | |
return route | |
} | |
return null | |
} | |
catch (e) { | |
debug("Поиск маршрутов (%s) - ошибка: %s", pathTitle, e.message) | |
return null | |
} | |
} | |
async createInvoice(receiver, sender, {route, pathTitle}) { | |
let amountSats = Math.floor((+route.total_amt_msat - +route.total_fees_msat) / 1000) | |
let memo = `Rebalance from ${sender.key} to ${receiver.key} ${amountSats} sats` | |
debugPay("Создание инвойса - (%s): %s", pathTitle, memo) | |
if (! this.options.dryRun) { | |
let res = await receiver.grpcClient.addInvoice({ | |
memo, | |
value: amountSats, | |
expiry: Math.round((SEND_TIMEOUT / 1000) + this.options.attempts * 10), | |
cltv_expiry: INVOICE_CLTV | |
}) | |
let decodedPayReq = await sender.grpcClient.decodePayReq({pay_req: res.payment_request}) | |
debug("Декодированный инвойс: %o", decodedPayReq) | |
return decodedPayReq | |
} | |
else { | |
return null | |
} | |
} | |
async considerChance(receiver, sender, amount) { | |
// debug("Ставим pending блокировку, receiver: %s, sender: %s", receiver.chanId, sender.chanId) | |
// Ставим pending блокировку, чтобы не обрабатывать их в другом потоке | |
let opts | |
/* | |
if (! (opts = this.getOptsForPartlyPayment({receiver, sender}, amount)).feeOK) { | |
// TODBG - проверить потом - не должно исполняться | |
debug( | |
"ВНИМАНИЕ! Не должно произойти, так как проверка была ранее, но fee последнего хопа ( %d msats ) будет выше целевой (%d msats) - аннулируем балансировку (%s), amount: %d, remotePolicy=%o", | |
opts.lastHopFeeMsat, | |
opts.maxFeeMsats, | |
opts.pathTitle, | |
amount, | |
receiver.remotePolicy | |
) | |
let release = await this.sendersMutex.acquire() | |
this.senders.push(sender) | |
release() | |
return | |
} | |
*/ | |
opts = this.getOptsForPartlyPayment({receiver, sender}, amount) | |
debug("considerChance: пробуем отправить платежи (%s), opts: %o", opts.pathTitle, opts) | |
let payments = await this.sendMultiPayment({receiver, sender, fullAmount: amount, opts}) | |
if (payments.filter(v => v.status === SEND_PAYMENT_OK).length > 0) { | |
// Если платёж прошёл - теперь эти receiver & sender использоваться не будут | |
debug("Платежи (%s) прошли частично: %o", opts.pathTitle, payments) | |
if (this.startOutputDots) | |
process.stdout.write('X') | |
return true | |
} | |
else { | |
// Неудачная балансировка - снова засовываем элементы в массивы для обработки | |
let release | |
debug("Балансировка не удалась (%s) - возвращаем sender обратно", opts.pathTitle) | |
release = await this.sendersMutex.acquire() | |
this.senders.push(sender) | |
release() | |
return false | |
} | |
} | |
async sendMultiPayment(args) { | |
let {fullAmount} = args | |
let payments = [] | |
let id = 1, amount | |
for (; fullAmount > 0; fullAmount -= this.options.maxBalancedPayment, id++) { | |
amount = fullAmount > this.options.maxBalancedPayment ? this.options.maxBalancedPayment : fullAmount | |
payments.push({id, amount, status: null}) | |
} | |
await this.sendPaymentsInParallel(payments, args) | |
return payments | |
} | |
async sendPaymentsInParallel(payments, args) { | |
// Отправляем максимум по 4 платежа параллельно | |
let pool = new PromisePool(this.sendOnePaymentGenerator(payments, args), SEND_PAYMENT_CONCURRENCY) | |
pool.addEventListener('fulfilled', function () { | |
}) | |
pool.addEventListener('rejected', function (event) { | |
console.error('sendPaymentsInParallel: error: %o: ', event.data.error.message) | |
}) | |
debugPay(`Отправка платежей (${payments.length}) в потоке (${args.opts.pathTitle})`) | |
// Start the pool. | |
await pool.start() | |
debugPay(`Отправка платежей (${payments.length}) завершена (${args.opts.pathTitle})`) | |
//console.log(`Поиск маршрутов завершён.\nУспешных платежей: ${this.okAmntPayments}\nНеуспешных платежей: ${this.failedAmntPayments}\nЗатрачено: ${Math.floor(this.totalPaidFees / 1000)} sats`) | |
} | |
* sendOnePaymentGenerator(payments, args) { | |
for (let payment of payments) { | |
let opts = this.getOptsForPartlyPayment(args, payment.amount, payment.id, payments.length) | |
if (opts.feeOK) { | |
yield ( async () => { | |
await waitSomeTime(Math.floor(Math.random() * 5000)) | |
debug("Payment[%d] (%s), payment: %o, opts before: %o, opts after: %o", payment.id, args.opts.pathTitle, payment, args.opts, opts) | |
payment.status = await this.createInvoiceAndPay({...args, opts}); | |
if (payment.status !== SEND_PAYMENT_OK) { | |
if (payment.status === SEND_PAYMENT_NO_ROUTE) { | |
debug("Не найдены маршруты (%s) для платежа [%d]", args.opts.pathTitle, payment.id) | |
this.ignoredLocalChanId[args.sender.chanId] = (this.ignoredLocalChanId[args.sender.chanId] || 0) + 1 | |
this.ignoredLocalChanId[args.receiver.chanId] = (this.ignoredLocalChanId[args.receiver.chanId] || 0) + 1 | |
debug("ignoredLocalChanId: %o", this.ignoredLocalChanId) | |
this.noRouteAmntPayments++ | |
} | |
else { | |
payment.status === SEND_PAYMENT_STOP_TIMEOUT ? this.timedOutAmntPayments++ : this.failedAmntPayments++ | |
} | |
} | |
})() | |
} | |
} | |
} | |
/* | |
* Расчитывает комиссию последнего "хопа", максимальную комиссию на основе целевых параметров | |
* и вычисляет сумму, которую надо будет выставить в инвойсе | |
* Также определяет: является ли итоговая комиссия последнего хопа выше целевой | |
* (позже расчёта маршрута будет вычислена новая комиссия, которая снова будет сверена с целевой - эта нужна, | |
* чтобы сразу отсечь варианты, если последний хоп берёт больше, чем нам нужно) | |
* */ | |
getOptsForPartlyPayment({receiver, sender}, amount, paymentNumber = 1, paymentsAmount = 1) { | |
// Расчитываем fee для последнего канала | |
let opts = { | |
pathTitle: `${sender.key}->${sender.chanId}->${receiver.chanId}->${receiver.key} [${paymentNumber}/${paymentsAmount}]`, | |
maxFeeMsats: this.getMaxFeeMsats(amount, 1 / paymentsAmount), | |
lastHopFeeMsat: Math.floor(amount * 1000 * +receiver.remotePolicy.fee_rate_milli_msat / 1E6 + +receiver.remotePolicy.fee_base_msat), | |
amount | |
} | |
opts.lastHopAmount = Math.floor(opts.lastHopFeeMsat / 1000 + amount) | |
opts.feeOK = ! (opts.lastHopFeeMsat > opts.maxFeeMsats) | |
return opts | |
} | |
feeNotExceed(receiver, factor = 1) { | |
return +receiver.remotePolicy.fee_rate_milli_msat <= this.options.feeRate && +receiver.remotePolicy.fee_base_msat <= this.options.feeBase * factor | |
} | |
async createInvoiceAndPay({receiver, sender, opts }) { | |
debug("createInvoiceAndPay, opts: %o", opts) | |
let route = await this.findRoute(receiver, sender, opts) | |
let status | |
if (route) { | |
let payReq = await this.createInvoice(receiver, sender, {route, pathTitle: opts.pathTitle}) | |
let i | |
for (i = 0; i < this.options.attempts && route; i++) { | |
// Делаем несколько попыток оплаты | |
if ((status = await this.sendPayment(receiver, sender, {route, payReq, pathTitle: opts.pathTitle})) !== SEND_PAYMENT_AGAIN) | |
break | |
route = await this.findRoute(receiver, sender, opts) | |
} | |
} | |
return route ? status : SEND_PAYMENT_NO_ROUTE | |
} | |
async sendPayment(receiver, sender, {payReq, route, pathTitle}) { | |
if (! this.options.dryRun) { | |
try { | |
let resPayment = await pTimeout( | |
sender.grpcClient.Router.sendToRoute({ | |
payment_hash: Buffer.from(payReq.payment_hash, 'hex'), | |
route | |
}), | |
SEND_TIMEOUT | |
) | |
debugPay("Результат оплаты канала (%s): %o", pathTitle, resPayment) | |
if (resPayment.failure) { | |
let error = resPayment.failure | |
debugPay("Ошибка оплаты (%s) инвойса (%s), хопов: %d, ошибка: %s", pathTitle, payReq.description, route.hops.length, error.code); | |
// Добавляем в игнор проблемную edge | |
let sourceFailurePubkey = error.failure_source_index ? route.hops[error.failure_source_index - 1].pub_key : sender.localPubkey | |
let nextFailurePubkey = route.hops[error.failure_source_index].pub_key | |
let chanId | |
switch (error.code) { | |
case 'TEMPORARY_CHANNEL_FAILURE': | |
chanId = route.hops[error.failure_source_index].chan_id | |
this.ignoredEdges[`${chanId}-${+(sourceFailurePubkey > nextFailurePubkey)}`] = [{from: Buffer.from(sourceFailurePubkey, 'hex'), to: Buffer.from(nextFailurePubkey, 'hex')}] | |
this.ignoreNodes[sourceFailurePubkey] = (this.ignoreNodes[sourceFailurePubkey] || 0) + 1 | |
debugPay("Добавляем в игнор (%s), %s, %s->%s, [%s]", pathTitle, chanId, sourceFailurePubkey, nextFailurePubkey, `${chanId}-${+(sourceFailurePubkey > nextFailurePubkey)}`) | |
return SEND_PAYMENT_AGAIN | |
case 'UNKNOWN_NEXT_PEER': | |
this.permanentIgnoredNodes[nextFailurePubkey] = 1 | |
debugPay("Добавляем в игнор узел %s, так как он в оффлайн", nextFailurePubkey) | |
return SEND_PAYMENT_AGAIN | |
case 'TEMPORARY_NODE_FAILURE': | |
case 'UNKNOWN_FAILURE': | |
case 'EXPIRY_TOO_FAR': | |
case 'PERMANENT_NODE_FAILURE': | |
this.permanentIgnoredNodes[sourceFailurePubkey] = 1 | |
debugPay("Добавляем в игнор узел %s, так как он временно не работает", sourceFailurePubkey) | |
return SEND_PAYMENT_AGAIN | |
case 'FEE_INSUFFICIENT': | |
debugPay("Комиссии одного из узлов (%s) изменились, пробуем снова", sourceFailurePubkey) | |
this.ignoreNodes[sourceFailurePubkey] = (this.ignoreNodes[sourceFailurePubkey] || 0) + 1 | |
return SEND_PAYMENT_AGAIN | |
default: | |
debugPay("Неизвестная ошибка (%s), решить как её обрабатывать, error: %o", error.code, error) | |
return SEND_PAYMENT_STOP | |
} | |
} | |
else { | |
this.totalPaidFees += route.total_fees_msat | |
debugPay("Инвойс (%s) успешно оплачен (fee: %d sats)! Хопов: %d (%s). Всего уже заплачено: %d sats", payReq.description, Math.floor(route.total_fees_msat / 1000), route.hops.length, pathTitle, Math.floor(this.totalPaidFees / 1000)) | |
this.okAmntPayments++ | |
return SEND_PAYMENT_OK | |
} | |
} | |
catch (e) { | |
debugPay("Оплата канала (%s) - таймаут. Оставляем всё как есть и продолжаем дальше (%s)", payReq.description, pathTitle) | |
return SEND_PAYMENT_STOP_TIMEOUT | |
} | |
} | |
else { | |
debugPay("Эмуляция отправки (%s), receiver: %o, sender: %o, route: %o", pathTitle, receiver, sender, route) | |
return SEND_PAYMENT_OK | |
} | |
} | |
async recalculateRoute(receiver, route, grpcClient) { | |
debug("Начата рекалькуляция маршрута, receiver=%o, route=%o", receiver, route) | |
route.hops[route.hops.length - 2].expiry = route.hops[route.hops.length - 1].expiry | |
let nextAmount = +route.hops[route.hops.length - 1].amt_to_forward_msat | |
let totalFeeMsat = 0 | |
for (let i = route.hops.length - 2; i >= 0; i--) { | |
delete route.hops[i].fee | |
let policy | |
if (route.hops.length - 2 === i) { | |
policy = receiver.remotePolicy | |
} | |
else { | |
let edgeNext = new Edge({chanId: route.hops[i + 1].chan_id, grpcClient}) | |
await edgeNext.policies() | |
policy = edgeNext.policy(route.hops[i].pub_key) | |
} | |
if (! policy) | |
throw new Error(`Неизвестны policy для узла ${route.hops[i].pub_key} канала ${route.hops[i].chan_id}`) | |
route.hops[i].fee_msat = Math.floor(nextAmount * +policy.fee_rate_milli_msat / 1000000 + +policy.fee_base_msat) | |
totalFeeMsat += route.hops[i].fee_msat | |
route.hops[i].amt_to_forward_msat = nextAmount | |
delete route.hops[i].amt_to_forward | |
nextAmount = nextAmount + route.hops[i].fee_msat | |
} | |
route.total_amt_msat = nextAmount | |
route.total_fees_msat = totalFeeMsat | |
delete route.total_amt | |
delete route.total_fees | |
debug("Конец рекалькуляции маршрута, receiver=%o, route=%o", receiver, route) | |
} | |
makeIgnoredPairs(receiver, sender) { | |
let ignore = [] | |
this.localIgnoredEdges.forEach((array, edge) => { | |
if (edge !== receiver && edge !== sender) | |
ignore.push(...array) | |
else if (edge === receiver) | |
ignore.push(array[0]) | |
else if (edge === sender) | |
ignore.push(array[1]) | |
}) | |
Object.values(this.ignoredPendingChannels).forEach(array => ignore.push(...array)) | |
Object.values({...this.permanentIgnoredEdges, ...this.ignoredEdges}).forEach(array => ignore.push(...array)) | |
return ignore | |
} | |
mergeIgnoredEdgesWithPermanent() { | |
debug("mergeIgnoredEdgesWithPermanent before: %o", this.ignoredEdges) | |
Object.entries(this.ignoredEdges).forEach(v => this.permanentIgnoredEdges[v[0]] = v[1]) | |
} | |
async findCandidates() { | |
/* Эти пары нужны для того, чтобы не оптимизировать каналы, которые лучше не оптимизировать | |
Например: наш узел A имеет с узлом B два канала - один sender, другой receiver | |
Если сбалансировать оба канала, то с одной стороны это хорошо, | |
но с другой снижает прохождение крупного платежа в одном направлении. | |
Учитывая, что узлы могут отправлять платёж через другой канал для того же удалённого узла, | |
лучше иметь два несбалансированных канала с обеих сторон, чем два сбалансированных. | |
nodePairs как раз призвана найти такие каналы и убрать из кандидатов | |
* */ | |
this.candidates = [] | |
let nodePairs = {} | |
let amntReceivers = 0, amntSenders = 0, amntDeletedReceiversOrSenders = 0 | |
for (let key in nodeStorage.nodes) { | |
if (nodeStorage.nodes[key].client) { | |
for (let channel of this.listChannels[key].channels) { | |
// Канал не с моим узлом... | |
let edge = new LocalEdge(key, channel) | |
//debug("edge объект: %o", edge) | |
if (! myNodes[channel.remote_pubkey] && ! edge.channel.private && edge.channel.active && edge.unbalanced) { | |
let nodePairKey = `${edge.localPubkey}-${edge.remotePubkey}` | |
if (! nodePairs[nodePairKey]) | |
nodePairs[nodePairKey] = [] | |
nodePairs[nodePairKey].push(edge) | |
} | |
await this.addToIgnore(edge) | |
} | |
} | |
else { | |
throw new Error('Все узлы должны работать!') | |
} | |
} | |
for (let arr of Object.values(nodePairs)) { | |
if (arr.length > 1) { | |
// Определяем какие каналы убрать из кандидатов, сортируем в первую очередь по размерам балансов, | |
// чтобы удалить потом в первую очередь крупные каналы | |
let senders = arr.filter( v => v.type == TYPE_SENDER).sort((a, b) => a.localBalance - b.localBalance) | |
let receivers = arr.filter( v => v.type == TYPE_RECEIVER).sort((a, b) => b.localBalance - a.localBalance) | |
let amnt = Math.min(receivers.length, senders.length) | |
if (amnt > 0) { | |
receivers.splice(0, amnt) | |
senders.splice(0, amnt) | |
amntDeletedReceiversOrSenders += amnt * 2 | |
arr = [...receivers, ...senders] | |
} | |
} | |
for (let edge of arr) { | |
await edge.policies() | |
// Требует ребалансировки | |
//debug("Помещаем edge в список кандидатов, edge: %o", edge) | |
if (edge.localPolicy && edge.remotePolicy) { | |
edge.type === TYPE_SENDER ? amntSenders++ : amntReceivers++ | |
this.candidates.push(edge) | |
} | |
} | |
} | |
console.log("Количество receivers: %d, senders: %d, удалённых receivers/senders: %d", amntReceivers, amntSenders, amntDeletedReceiversOrSenders) | |
} | |
isBannedLocalChanId(receivedOrSender) { | |
return (this.ignoredLocalChanId[receivedOrSender.chanId] && this.ignoredLocalChanId[receivedOrSender.chanId] >= BAN_LOCAL_CHAN_ID_AFTER_N_FAILS) | |
|| this.permanentIgnoredLocalChanId[receivedOrSender.chanId]; | |
} | |
async considerChanceForReceiver(receiver, backArray, tolerance = 0) { | |
let release, sender, amount | |
if (! this.feeNotExceed(receiver)) { | |
debug( | |
"Fee последнего хопа превышает заданную fee: rate %d против %d, base %d против %d, receiver: %o", | |
+receiver.remotePolicy.fee_rate_milli_msat, | |
this.options.feeRate, | |
+receiver.remotePolicy.fee_base_msat, | |
this.options.feeBase, | |
receiver | |
) | |
return | |
} | |
let tried = new Set() | |
for (;;) { | |
if (this.isBannedLocalChanId(receiver)) { | |
backArray.push(receiver) | |
if (this.startOutputDots) | |
process.stdout.write('_') | |
break | |
} | |
if (this.startOutputDots) | |
process.stdout.write('.') | |
try { | |
release = await this.sendersMutex.acquire() | |
sender = null | |
for (let i = 0; i < this.senders.length; i++) { | |
if (tried.has(this.senders[i])) | |
continue | |
tried.add(this.senders[i]) | |
amount = this.optimalAmount(receiver, this.senders[i], tolerance) | |
if (! amount || amount < MINIMUM_FOR_BALANCE || this.isBannedLocalChanId(this.senders[i])) | |
continue | |
sender = this.senders.splice(i, 1)[0] | |
break | |
} | |
} | |
finally { | |
release() | |
} | |
if (sender) { | |
if (await this.considerChance(receiver, sender, amount)) | |
break | |
} | |
else { | |
backArray.push(receiver) | |
if (this.startOutputDots) | |
process.stdout.write('_') | |
break | |
} | |
} | |
} | |
async rebalancing() { | |
this.receivers = [] | |
this.senders = [] | |
this.sendersMutex = new Mutex() | |
// Проходим по каналам и собираем информацию для корректировки | |
for (let receiver of this.candidates) { | |
// Проходим по кандидатам на поиск маршрутов от send to receive | |
if (receiver.type === TYPE_RECEIVER) { | |
if (receiver.maxReceive >= MINIMUM_FOR_BALANCE) | |
this.receivers.push(receiver) | |
} | |
} | |
for (let sender of this.candidates) { | |
if (sender.type === TYPE_SENDER) { | |
if (sender.maxSend >= MINIMUM_FOR_BALANCE) | |
this.senders.push(sender) | |
} | |
} | |
for (let i = this.options.startRound - 1; i < AMOUNT_ROUNDS; i++) { | |
// Create a pool. | |
let pool = new PromisePool(this.findingRoutesGenerator(i), this.concurrency) | |
pool.addEventListener('rejected', function (event) { | |
console.error('findingRoutesGenerator: ОШИБКА: error: %o: ', event.data.error.message) | |
}) | |
console.log(`Запуск поиска маршрутов в параллель: ${this.concurrency}`) | |
// Start the pool. | |
let poolPromise = pool.start() | |
// Wait for the pool to settle. | |
await poolPromise | |
console.log(`---------------------------------- | |
Ребаланс (раунд) завершён | |
На данный момент с момента запуска: | |
Успешных платежей: ${this.okAmntPayments} | |
Неуспешных платежей: ${this.failedAmntPayments} | |
Затрачено: ${Math.floor(this.totalPaidFees / 1000)} sats | |
---------------------------------- | |
`) | |
} | |
// После последнего цикла у нас остаётся список игнорируемых каналов и узлов, где не прошли даже малые платежи | |
// Эти списки мы объединяем с перманентными (между всеми циклами), | |
// чтобы при следующих ребалансингах этот список помогал нам быстрее обходить такие каналы стороной | |
this.mergeIgnoredNodesWithPermanentOnes() | |
this.mergeIgnoredLocalChanIdWithPermanentOnes() | |
this.mergeIgnoredEdgesWithPermanent() | |
} | |
* findingRoutesGenerator(i) { | |
let amntDone = 0, percent = 0, amntFull = this.receivers.length | |
let tolerance = i ? ROUND_FACTOR ** i : 0 | |
this.resetIgnores() | |
// Проходим по каналам и собираем информацию для корректировки | |
console.log("Количество кандидатов this.receivers: %d, this.senders: %d", this.receivers.length, this.senders.length) | |
let options = this.options = Object.assign({}, this.constructorOptions) | |
/* | |
let countdown = AMOUNT_ROUNDS - i - 1 | |
options.feeRate = Math.round(options.feeRate / ROUND_FACTOR ** countdown) | |
options.feeBase = Math.round(options.feeBase / ROUND_FACTOR ** countdown) | |
*/ | |
options.maxBalancedPayment = Math.round(options.maxBalancedPayment / ROUND_FACTOR ** i) | |
if (! options.maxBalancedPayment || options.maxBalancedPayment < MINIMUM_FOR_BALANCE / 2) | |
options.maxBalancedPayment = MINIMUM_FOR_BALANCE / 2 | |
console.log("---------------------------\nРаунд # %d", i + 1) | |
console.log("Опция --fee-rate: %d", options.feeRate) | |
console.log("Опция --fee-base: %d", options.feeBase) | |
console.log("Опция --concurrency: %d", options.concurrency) | |
console.log("Опция --max-balanced-payment: %d", options.maxBalancedPayment) | |
let receiver, backArray = [] | |
this.startOutputDots = false | |
while ((receiver = this.receivers.shift())) { | |
yield this.considerChanceForReceiver(receiver, backArray, tolerance) | |
amntDone++ | |
if (amntDone / amntFull * 100 >= percent) { | |
console.log( | |
"Сделано %d%% (%d), Fees: %d sats, Успешных: %d, Без маршрута: %d, Таймаут: %d, Неудачных: %d", | |
percent, | |
amntDone, | |
Math.floor(this.totalPaidFees / 1000), | |
this.okAmntPayments, | |
this.noRouteAmntPayments, | |
this.timedOutAmntPayments, | |
this.failedAmntPayments | |
) | |
percent++ | |
} | |
} | |
this.startOutputDots = true | |
this.receivers = backArray | |
} | |
} | |
module.exports = async function (opts = {}) { | |
if (opts.attempts < 1) | |
throw new Error('--attempts должна быть более ноля') | |
let rebalancer = new Rebalancer(opts) | |
for (let i = 1; i <= 5; i++) { | |
console.log(`Запуск ${i}/5\n\n`) | |
await rebalancer.run() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment