Last active
May 11, 2024 04:09
-
-
Save mindon/3b0c107da2cb51cd15e10a31daade672 to your computer and use it in GitHub Desktop.
helper in zig to get data from auto full backups of Tencent Cloud Postgres 腾讯云POSTGRES备份数据提取小工具
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// tencent cloud postgres data recovery helper | |
/// required: | |
/// [zig](https://ziglang.org/download/) | |
/// [docker](https://docker.com/) or [podman](https://podman.io/) required | |
/// usage: | |
/// 1) update recovery for zst path and tables to export | |
/// 2) `zig run txcloud_pgrecovery.zig` | |
const std = @import("std"); | |
const print = std.debug.print; | |
const path = std.fs.path; | |
const Child = std.process.Child; | |
const ArrayList = std.ArrayList; | |
const pga = std.heap.page_allocator; | |
// for tables config to generate tsv and sql | |
const Table = struct { | |
db: []const u8, | |
name: []const u8, | |
fields: []const u8, | |
types: []const u8, | |
conds: []const u8, | |
cache: bool, | |
call: ?(*const fn ([]const u8, Table) anyerror!void), | |
}; | |
// full path of auto backup downloaded from https://console.cloud.tencent.com/postgres | |
const recovery = "/Users/mindon/Downloads/automatic-20240428030826.tar.zst"; | |
// configure data to get and generate update sql | |
const tables: [1]Table = .{ | |
.{ | |
.db = "hello", // database name | |
.name = "world", // table name | |
.fields = "id,a,b,c", // fields list | |
.types = ",n,,", // type of fields: s or empty for string, jsonb, others: n for number, ...anything else without quotes | |
.conds = "", // conditions: [where] [order by] [limits] | |
.cache = false, // using tsv without query if exists | |
.call = &sqlUpdate, // callback or null to handle tsv contents | |
}, | |
}; | |
/// auto backup recovery based on guides: https://cloud.tencent.com/document/product/409/11642 | |
pub fn main() !void { | |
var ret: u8 = 0; | |
const term, const stdout, const stderr = shell(pga, &[_][]const u8{ | |
"ls", | |
"-lh", | |
recovery, | |
}) catch |err| .{ Child.Term{ .Exited = 9 }, "", @errorName(err) }; | |
defer { | |
if (term.Exited != 9) { | |
pga.free(stdout); | |
pga.free(stderr); | |
} | |
} | |
ret = term.Exited; | |
print("{s} - err: {s}\n", .{ stdout, stderr }); | |
// prepare recovery directory | |
const dir = path.dirname(recovery); | |
var redir: []u8 = undefined; | |
if (dir) |v| { | |
const name = path.basename(recovery); | |
const i = std.mem.indexOf(u8, name, "."); | |
redir = try path.resolve(pga, &[_][]const u8{ v, if (i) |n| name[0..n] else name }); | |
} else { | |
redir = try pga.dupe(u8, "./recovery"); | |
} | |
defer pga.free(redir); | |
print("{s}\n", .{redir}); | |
var target_dir = std.fs.cwd().openDir(redir, .{}) catch blk: { | |
if (ret != 0) { | |
return error.FileNotFound; | |
} | |
try std.fs.cwd().makePath(redir); | |
break :blk try std.fs.cwd().openDir(redir, .{}); | |
}; | |
target_dir.close(); | |
// prepare postgresql.conf for local usage | |
const flpath = try path.resolve(pga, &[_][]const u8{ redir, "postgresql.conf" }); | |
const fd = std.fs.openFileAbsolute(flpath, .{ .mode = .read_write }) catch blk: { | |
if (ret != 0) { | |
return error.FileNotFound; | |
} | |
dezstd(recovery, redir) catch {}; | |
break :blk std.fs.openFileAbsolute(flpath, .{ .mode = .read_write }) catch null; | |
}; | |
if (fd) |fp| { | |
try modify(fp); | |
fp.close(); | |
try take(redir); | |
} else { | |
return error.FileNotFound; | |
} | |
} | |
/// take data from recovery postgres | |
fn take(redir: []const u8) !void { | |
// detect pg version | |
const flver = try path.resolve(pga, &[_][]const u8{ redir, "PG_VERSION" }); | |
defer pga.free(flver); | |
const fdver = try std.fs.openFileAbsolute(flver, .{ .mode = .read_only }); | |
defer fdver.close(); | |
const sizever = if (fdver.stat() catch null) |s| s.size else 0; | |
var ver: []const u8 = "9.5"; | |
if (sizever > 0) { | |
const buffer = try pga.alloc(u8, sizever); | |
defer pga.free(buffer); | |
try fdver.reader().readNoEof(buffer); | |
ver = try pga.dupe(u8, std.mem.trim(u8, buffer, " \r\n")); | |
} | |
const postgre_img = try std.fmt.allocPrint(pga, "postgres:{s}-alpine", .{ver}); | |
defer pga.free(postgre_img); | |
if (sizever > 0) pga.free(ver); | |
print("Postgre Image: {s}\n", .{postgre_img}); | |
// detect docker, if failed use podman instead | |
var cmd: []const u8 = "docker"; | |
const term, const stdout_, const stderr_ = try shell(pga, &[_][]const u8{ | |
cmd, | |
"-v", | |
}); | |
if (term.Exited != 0) { | |
cmd = "podman"; | |
const term__, const stdout__, const stderr__ = try shell(pga, &[_][]const u8{ | |
cmd, | |
"-v", | |
}); | |
print("{s}\n", .{if (term__.Exited != 0) stderr__ else stdout__}); | |
pga.free(stdout__); | |
pga.free(stderr__); | |
} else { | |
print("{s}\n", .{stdout_}); | |
} | |
pga.free(stdout_); | |
pga.free(stderr_); | |
const fldata = try path.resolve(pga, &[_][]const u8{ redir, "data.sh" }); | |
var fddata = try std.fs.cwd().createFile(fldata, .{}); | |
defer fddata.close(); | |
var datalines = std.ArrayList(u8).init(pga); | |
defer datalines.deinit(); | |
try datalines.appendSlice("pg_ctl start -D /recovery -w\n"); | |
var n: u32 = 0; | |
for (tables) |t| { | |
const flname = try std.fmt.allocPrint(pga, "{s}.tsv", .{t.name}); | |
const flpath = try path.resolve(pga, &[_][]const u8{ redir, flname }); | |
const fd = std.fs.openFileAbsolute(flpath, .{ .mode = .read_only }) catch null; | |
var size: u64 = 0; | |
if (fd) |fp| { | |
size = if (fp.stat() catch null) |s| s.size else 0; | |
fp.close(); | |
} | |
if (size == 0 and !t.cache) { | |
const line = try std.fmt.allocPrint(pga, "psql -d {s} -c \"COPY (SELECT {s} FROM {s} {s}) TO '/recovery/{s}.tsv' WITH DELIMITER E'\\t' CSV HEADER;\"\n", .{ | |
t.db, | |
t.fields, | |
t.name, | |
t.conds, | |
t.name, | |
}); | |
defer pga.free(line); | |
try datalines.appendSlice(line); | |
n += 1; | |
} else { | |
print("[SKIP] -- {s} exists\n", .{flname}); | |
} | |
} | |
try fddata.writeAll(datalines.items); | |
if (n > 0) { | |
const flcmd = try path.resolve(pga, &[_][]const u8{ redir, "recover.sh" }); | |
var fdcmd = try std.fs.cwd().createFile(flcmd, .{}); | |
defer fdcmd.close(); | |
const cmdlines = | |
\\#!/bin/bash | |
\\chmod 0700 /recovery | |
\\chown postgres:postgres /recovery | |
\\su postgres -c "sh /recovery/data.sh" | |
; | |
try fdcmd.writeAll(cmdlines); | |
_, const out, const err = try shell(pga, &[_][]const u8{ | |
"chmod", | |
"+x", | |
flcmd, | |
}); | |
pga.free(out); | |
pga.free(err); | |
// run recover.sh | |
const mapping_dir = try std.fmt.allocPrint(pga, "{s}:/recovery", .{redir}); | |
defer pga.free(mapping_dir); | |
_, const stdout, const stderr = try shell(pga, &[_][]const u8{ | |
cmd, | |
"run", | |
"--rm", | |
"-v", | |
mapping_dir, | |
postgre_img, | |
"/recovery/recover.sh", | |
}); | |
print("{s} - err: {s}\n", .{ stdout, stderr }); | |
pga.free(stdout); | |
pga.free(stderr); | |
} | |
for (tables) |t| { | |
if (t.call) |cb| { | |
const flpath = try path.resolve(pga, &[_][]const u8{ redir, t.name }); | |
try cb(flpath, t); | |
} | |
} | |
} | |
// generate update sql | |
fn sqlUpdate(flpath: []const u8, t: Table) !void { | |
const fltsv = try std.fmt.allocPrint(pga, "{s}.tsv", .{flpath}); | |
defer pga.free(fltsv); | |
const fd = try std.fs.openFileAbsolute(fltsv, .{ .mode = .read_only }); | |
defer fd.close(); | |
const size = if (fd.stat() catch null) |s| s.size else 0; | |
print("{d} - {s}\n", .{ size, fltsv }); | |
if (size == 0) { | |
return error.EmptyFile; | |
} | |
const buffer = try pga.alloc(u8, size); | |
defer pga.free(buffer); | |
try fd.reader().readNoEof(buffer); | |
var lines = std.mem.split(u8, buffer, "\n"); | |
var sqls = std.ArrayList(u8).init(pga); | |
defer sqls.deinit(); | |
var n: u32 = 0; | |
var fields = std.mem.split(u8, t.fields, ","); | |
var types = std.mem.split(u8, t.types, ","); | |
while (lines.next()) |line| { | |
var d = std.mem.split(u8, line, "\t"); | |
if (n == 0) { | |
n += 1; | |
continue; | |
} | |
fields.reset(); | |
types.reset(); | |
var i: u32 = 0; | |
var identify: []const u8 = ""; | |
var data = std.ArrayList(u8).init(pga); | |
while (fields.next()) |part| { | |
if (d.next()) |value| { | |
const vt = if (types.next()) |tv| tv else "s"; | |
var v = value; | |
var free = false; | |
if (std.mem.startsWith(u8, v, "\"") and std.mem.endsWith(u8, v, "\"")) { | |
const output = try pga.alloc(u8, std.mem.replacementSize(u8, v, "\"\"", "\"")); | |
_ = std.mem.replace(u8, v, "\"\"", "\"", output); | |
v = try pga.dupe(u8, output[1..(output.len - 1)]); | |
free = true; | |
pga.free(output); | |
} | |
// TODO: form the update sql | |
if (std.mem.eql(u8, vt, "jsonb") or (std.mem.startsWith(u8, v, "{") and std.mem.endsWith(u8, v, "}"))) { | |
const s = try std.fmt.allocPrint(pga, "{s}='{s}'::jsonb", .{ part, v }); | |
if (i == 0) { | |
identify = s; | |
} else { | |
try data.appendSlice(","); | |
try data.appendSlice(s); | |
} | |
// print("{d}: {s}='{s}'::jsonb\n", .{ i, part, v }); | |
} else { | |
const s = if (vt.len == 0 or std.mem.eql(u8, vt, "s")) | |
try std.fmt.allocPrint(pga, "{s}='{s}'", .{ part, std.mem.trim(u8, v, " ") }) | |
else | |
try std.fmt.allocPrint(pga, "{s}={s}", .{ part, std.mem.trim(u8, v, " ") }); | |
if (i == 0) { | |
identify = s; | |
} else { | |
try data.appendSlice(","); | |
try data.appendSlice(s); | |
} | |
// print("{d}: {s}='{s}'\n", .{ i, part, std.mem.trim(u8, v, " ") }); | |
} | |
if (free) { | |
pga.free(v); | |
} | |
i += 1; | |
} | |
} | |
if (data.items.len > 1) { | |
const sql = try std.fmt.allocPrint(pga, "UPDATE {s} SET {s} WHERE {s};\n", .{ path.basename(flpath), data.items[1..], identify }); | |
try sqls.appendSlice(sql); | |
} | |
data.deinit(); | |
n += 1; | |
} | |
const flsql = try std.fmt.allocPrint(pga, "{s}.sql", .{flpath}); | |
const fdsql = try std.fs.cwd().createFile(flsql, .{}); | |
defer fdsql.close(); | |
try fdsql.writeAll(sqls.items); | |
print("[SQL] -- {s}\n\n", .{flsql}); | |
} | |
/// shell call | |
fn shell(allocator: std.mem.Allocator, argv: []const []const u8) !std.meta.Tuple(&.{ Child.Term, []u8, []u8 }) { | |
var arena = std.heap.ArenaAllocator.init(allocator); | |
defer arena.deinit(); | |
const alloc_arena = arena.allocator(); | |
var child = Child.init(argv, alloc_arena); | |
child.stdout_behavior = .Pipe; | |
child.stderr_behavior = .Pipe; | |
var stdout = ArrayList(u8).init(alloc_arena); | |
var stderr = ArrayList(u8).init(alloc_arena); | |
defer { | |
stdout.deinit(); | |
stderr.deinit(); | |
} | |
try child.spawn(); | |
try child.collectOutput(&stdout, &stderr, 1024); | |
const term = try child.wait(); | |
const a = std.mem.trim(u8, stdout.items, " \t\n\r"); | |
const out = try allocator.alloc(u8, a.len); | |
if (a.len > 0) { | |
@memcpy(out, a); | |
} | |
const b = std.mem.trim(u8, stderr.items, " \t\n\r"); | |
const err = try allocator.alloc(u8, b.len); | |
if (b.len > 0) { | |
@memcpy(err, b); | |
} | |
return .{ | |
term, | |
out, | |
err, | |
}; | |
} | |
/// decompress .tar.zstd file | |
fn dezstd(flpath: []const u8, redir: []const u8) !void { | |
_, const stdout, const stderr = try shell(pga, &[_][]const u8{ | |
"tar", | |
"--zstd", | |
"-C", | |
redir, | |
"-xf", | |
flpath, | |
}); | |
pga.free(stdout); | |
pga.free(stderr); | |
print("{s} {s}\n", .{ stdout, stderr }); | |
{ | |
const labels = try path.resolve(pga, &[_][]const u8{ redir, "backup_label" }); | |
_, const out, const err = try shell(pga, &[_][]const u8{ | |
"rm", | |
"-rf", | |
labels, | |
}); | |
pga.free(out); | |
pga.free(err); | |
} | |
} | |
/// comment lines in postgresql.conf for local recovery | |
fn modify(fp: std.fs.File) !void { | |
const clues = | |
\\shared_preload_libraries | |
\\local_preload_libraries | |
\\pg_stat_statements.max | |
\\pg_stat_statements.track | |
\\archive_mode | |
\\archive_command | |
\\synchronous_commit | |
\\synchronous_standby_names | |
\\port = | |
\\unix_socket_directories | |
\\include = 'standby.conf' | |
; | |
var comments = std.mem.split(u8, clues, "\n"); | |
const size = if (fp.stat() catch null) |s| s.size else 0; | |
// print("{d}\n", .{size}); | |
if (size == 0) { | |
return error.EmptyFile; | |
} | |
const buffer = try pga.alloc(u8, size); | |
defer pga.free(buffer); | |
try fp.reader().readNoEof(buffer); | |
var lines = std.mem.split(u8, buffer, "\n"); | |
var updated = std.ArrayList(u8).init(pga); | |
defer updated.deinit(); | |
var i: u32 = 0; | |
var fixed = false; | |
while (lines.next()) |line| { | |
// print("{s}\n", .{line}); | |
if (i > 0) { | |
try updated.appendSlice("\n"); | |
} | |
if (std.mem.startsWith(u8, line, "port = '5432'")) { | |
fixed = true; | |
} | |
if (!fixed) { | |
comments.reset(); | |
while (comments.next()) |comment| { | |
if (std.mem.startsWith(u8, line, comment)) { | |
try updated.appendSlice("# "); | |
break; | |
} | |
} | |
} | |
try updated.appendSlice(line); | |
i += 1; | |
} | |
if (fixed) { | |
return; | |
} | |
const fixing = | |
\\ | |
\\port = '5432' | |
\\unix_socket_directories = '/var/run/postgresql/' | |
\\synchronous_commit = local | |
\\synchronous_standby_names = '' | |
\\ | |
; | |
try updated.appendSlice(fixing); | |
try fp.seekTo(0); | |
try fp.writer().writeAll(updated.items); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment