Skip to content

Instantly share code, notes, and snippets.

@yaauie
Last active September 4, 2025 20:59
Show Gist options
  • Save yaauie/c7350c91bd84b824e749d118762f6143 to your computer and use it in GitHub Desktop.
Save yaauie/c7350c91bd84b824e749d118762f6143 to your computer and use it in GitHub Desktop.
A quick and dirty SLOW tool for reading a logstash persisted queue checkpoint file and page metadata

Usage

Checkpoints:

Single Checkpoint:

lsq-cpdump "${LOGSTASH_HOME}/data/queue/main/checkpoint.head"

All Checkpoints for a given queue:

find "${LOGSTASH_HOME}/data/queue/main" -name 'checkpoint.*' | sort | xargs -L1 lsq-cpdump

A checkpoint has the following info:

# CHECKPOINT data/queue/main/checkpoint.head
VERSION [            0001]: 1
PAGENUM [        0000001B]: 27
1UNAKPG [        0000001B]: 27
1UNAKSQ [0000000000029D42]: 171330
MINSEQN [00000000000298B7]: 170167
ELEMNTS [        0000048B]: 1163
CHECKSM [        CADD8469]

Pages:

Page dumps are not fast, but can give info about the sequence number, size, source-page, and serialization format of the elements in the page.

Single Page:

lsq-pagedump "${LOGSTASH_HOME}/data/queue/main/page.0"

All Pages for a given queue:

find "${LOGSTASH_HOME}/data/queue/main" -name 'page.*' | sort | xargs -L1 lsq-pagedump

A page dump has the following:

seqence	size	checksum	page_no	format
170167	11251	70FD37AC	page.27	CBOR(known)
170168	8722	BE5D0E3F	page.27	CBOR(known)
170169	11011	61547BEA	page.27	CBOR(known)
170170	11394	8DBAE3B8	page.27	CBOR(known)
170171	10784	303C6E11	page.27	CBOR(known)
170172	11095	D8FFEEC3	page.27	CBOR(known)
170173	13366	9B25E31B	page.27	CBOR(known)
170174	8821	9114A4DB	page.27	CBOR(known)
170175	9803	FFB8665F	page.27	CBOR(known)
170176	11345	43CEC558	page.27	CBOR(known)

Using a statics library like st, we can get info about the size distribution:

lsq-pagedump data/queue/main/page.27 | cut -f2 | st --min --max --mean --median --sd
min	median	max	mean	stddev
8628	10954	14045	10618.4	1132.37
#/usr/bin/env bash
filename="${1?:path to file}"
if [ ! -f "$filename" ]; then
echo "ENOENT: ${filename}"
exit 1
fi
# extract hexdump as array of bytes
hexdump=($(xxd -u -l40 -ps "${filename}" | awk '{printf "%s", $0}' | sed 's/.\{2\}/& /g'))
# ensure we have 34 bytes
checkpoint_size_bytes="${#hexdump[@]}"
if (( $checkpoint_size_bytes != 34 )); then
>&2 echo "WARN: CHECKPOINT FILE SIZE MISMATCH (${checkpoint_size_bytes}})"
fi
fmt_raw() {
local concat=$(printf '%s' "${@}")
printf '[%16s]' "${concat}"
}
fmt_int() {
local concat=$(printf '%s' "${@}")
local decimal=$((0x${concat}))
printf '[%16s]: %s' "${concat}" "${decimal}"
}
echo "# CHECKPOINT ${filename}"
echo "VERSION $(fmt_int ${hexdump[@]:0:2})" # short
echo "PAGENUM $(fmt_int ${hexdump[@]:2:4})" # int
echo "1UNAKPG $(fmt_int ${hexdump[@]:6:4})" # int
echo "1UNAKSQ $(fmt_int ${hexdump[@]:10:8})" # long
echo "MINSEQN $(fmt_int ${hexdump[@]:18:8})" # long
echo "ELEMNTS $(fmt_int ${hexdump[@]:26:4})" # int
echo "CHECKSM $(fmt_raw ${hexdump[@]:30:4})" # int
#/usr/bin/env bash
filename="${1?:path to pq page}"
pagename="$(basename "${filename}")"
_bytes2hex() {
xxd -u -ps | tr -d "\n"
}
_hex2num() {
local hex
read hex
printf '%s' $((0x${hex}))
}
_read_hex() {
local byte_count="${1:?byte count}"
dd bs=1 count="${byte_count}" 2>/dev/null | _bytes2hex
}
_read_tiny() {
_read_hex 1 | _hex2num
}
_read_int() {
_read_hex 4 | _hex2num
}
_read_long() {
_read_hex 8 | _hex2num
}
_describe_payload() {
local payload="${1?:payload}"
_detect_deflate "${payload}" ||
_detect_zstd "${payload}" ||
_detect_cbor "${payload}" ||
printf "CBOR(assumed $(export LC_ALL=C; printf '%s' "${payload:0:8}" | _bytes2hex))"
}
_detect_deflate() {
local hex_payload="${1?:hex_payload}"
local possible_deflate_header=$(( 0x${hex_payload:0:4} ))
# 0---1000--0-----
local mask=$(( 2#1000111100100000 ))
local flip=$(( 2#0000100000000000 ))
local goal=$(( 2#0000000000000000 ))
if (( ( ( possible_deflate_header & mask ) ^ flip ) != goal )); then
return 1
fi
if (( ( possible_deflate_header % 31 ) != 0 )); then
return 1
fi
local deflate_header=${possible_deflate_header}
local flevels=(fastest fast default maximum)
local flevel="${flevels[$(( (deflate_header & 2#11000000) >> 6))]}"
printf '%s' "DEFLATE(${flevel})"
}
_detect_zstd() {
local hex_payload="${1?:hex_payload}"
local header_hex="${hex_payload:0:8}"
if [[ "${header_hex}" == "28B52FFD" ]]; then
local frame_header_descriptor_bits=$(( 0x${hex_payload:8:2} ))
local fcsflag=$(( frame_header_descriptor_bits >> 6 ))
local ss_flag=$(( (frame_header_descriptor_bits & 2#00100000) >> 5 ))
local did_flag=$(( frame_header_descriptor_bits & 3 ))
# if ss is present, there is no WindowDescriptor
local fcs_byte_offset=$(( 5 + (ss_flag == 0 ? 1 : 0) + did_flag ))
local fcs_hex_offset=$(( 2 * fcs_byte_offset ))
local fcs_byte_length=$(( 2**(ss_flag > fcsflag ? ss_flag : fcsflag) ))
local fcs_hex_length=$(( 2 * fcs_byte_length ))
local fcs_hex_header_le="${hex_payload:${fcs_hex_offset}:${fcs_hex_length}}"
local fcs_hex_header_be=$(printf '%s' "${fcs_hex_header_le}" | tac -rs ..)
# when using 2-byte length, we add 256, otherwise we read as-is.
local content_length=$(( 0x${fcs_hex_header_be} + (fcs_byte_length == 2 ? 256 : 0) ))
printf '%s' "ZSTD(${content_length})"
else
return 1
fi
}
_detect_cbor() {
local hex_payload="${1?:hex_payload}"
local header_hex="${hex_payload:0:6}"
if [[ "${header_hex}" == "D9D9F7" ]]; then
printf '%s' "CBOR(self-desc)";
elif [[ "${header_hex}" == "D90100" ]]; then
printf '%s' "CBOR(stringref)"
elif [[ "${header_hex}" == "9F716A" ]]; then
printf '%s' "CBOR(known)";
else
return 1
fi
}
_parse_event() {
local seqnum=$(_read_long)
if ((seqnum <= 0)); then
return 1
fi
local length=$(_read_int)
local hex_payload=$(_read_hex $((length)))
local checksum=$(_read_hex 4)
printf '%s\t%s\t%s\t%s\t%s\n' "${seqnum}" "${length}" "${checksum}" "${pagename}" "$(_describe_payload ${hex_payload})"
[[ -z "${DEBUG}" ]] || (>&2 printf "%s\n" "${hex_payload}")
}
parse_stdin() {
local version=$(_read_tiny)
if (( version != 2 )); then
>&2 echo "ERR: Not Logstash PQ v2"
exit 1
fi
while _parse_event; do :; done
}
parse_stdin < "${filename}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment