Skip to content

Instantly share code, notes, and snippets.

@buzz
Last active May 15, 2026 03:47
Show Gist options
  • Select an option

  • Save buzz/1c439684d5e3f36492ae9f64ef7e3f67 to your computer and use it in GitHub Desktop.

Select an option

Save buzz/1c439684d5e3f36492ae9f64ef7e3f67 to your computer and use it in GitHub Desktop.
Transplant MTP block from one GGUF file into another
#!/usr/bin/env python3
"""
Transplant extra tensors (e.g. MTP layers) from one GGUF file into another,
producing a mixed-quantization GGUF.
Note: Tested with ik_llama.cpp GGUF Python module.
Usage:
python convert.py <target.gguf> <source.gguf> <output.gguf>
Arguments:
target — base GGUF (tensors + metadata kept as-is)
source — GGUF with extra blocks to transplant (e.g. blk.64.* for MTP)
output — resulting mixed-quantization GGUF
The script preserves the exact on-disk layout including per-row metadata
for quantization types like IQ4_KS that have row_meta_size > 0. This is
critical for GPU inference to work correctly.
Donor Models:
To save bandwidth, you can use 'tensors-only' GGUFs as the source file.
Credits to AzerbaijanNyan for the extraction:
https://www.reddit.com/r/LocalLLaMA/comments/1t6r1ny/extracted_mtp_tensor_ggufs_smaller_donor_models/
Available donors:
- Qwen3.6-35A3B: https://huggingface.co/IHaveNoClueAndIMustPost/Qwen3.6-35A3B-MTP-TENSORS-ONLY
- Qwen3.6-27b: https://huggingface.co/IHaveNoClueAndIMustPost/Qwen3.6-27b-MTP-TENSORS-ONLY
Example:
# Transplant MTP block from Q8_0 into IQ4_KS base model
python convert.py Qwen3.6-27B-IQ4_KS.gguf Qwen3.6-27B-MTP-Q8_0.gguf Qwen3.6-27B-MTP-IQ4_KS.gguf
"""
import hashlib
import sys
import struct
from pathlib import Path
from gguf import GGUFReader, GGUFValueType
def get_field_value(reader: GGUFReader, key: str):
"""Safely get a field value from GGUFReader."""
field = reader.get_field(key)
return field.contents() if field else None
def calculate_on_disk_sizes(tensors, file_size):
"""Calculate on-disk size for each tensor (including per-row metadata/padding)."""
n_tensors = len(tensors)
sizes = []
for i in range(n_tensors):
if i < n_tensors - 1:
sizes.append(tensors[i + 1].data_offset - tensors[i].data_offset)
else:
sizes.append(file_size - tensors[i].data_offset)
return sizes
def write_kv_value(fout, kv_type, value):
"""Write a KV value to the output file."""
if kv_type == GGUFValueType.STRING:
value_bytes = value.encode("utf-8")
fout.write(struct.pack("<Q", len(value_bytes)))
fout.write(value_bytes)
elif kv_type == GGUFValueType.ARRAY:
# This is handled separately in the main code
pass
elif kv_type in (GGUFValueType.UINT8, GGUFValueType.INT8, GGUFValueType.BOOL):
fout.write(struct.pack("<B", value))
elif kv_type in (GGUFValueType.UINT16, GGUFValueType.INT16):
fout.write(struct.pack("<H", value))
elif kv_type in (GGUFValueType.UINT32, GGUFValueType.INT32):
fout.write(struct.pack("<I", value))
elif kv_type == GGUFValueType.FLOAT32:
fout.write(struct.pack("<f", value))
elif kv_type in (GGUFValueType.UINT64, GGUFValueType.INT64):
fout.write(struct.pack("<Q", value))
elif kv_type == GGUFValueType.FLOAT64:
fout.write(struct.pack("<d", value))
def write_array_value(fout, sub_type, arr):
"""Write an array KV value to the output file."""
fout.write(struct.pack("<I", int(sub_type)))
fout.write(struct.pack("<Q", len(arr)))
for elem in arr:
if sub_type == GGUFValueType.STRING:
elem_bytes = elem.encode("utf-8")
fout.write(struct.pack("<Q", len(elem_bytes)))
fout.write(elem_bytes)
elif sub_type in (GGUFValueType.UINT8, GGUFValueType.INT8, GGUFValueType.BOOL):
fout.write(struct.pack("<B", elem))
elif sub_type in (GGUFValueType.UINT16, GGUFValueType.INT16):
fout.write(struct.pack("<H", elem))
elif sub_type in (GGUFValueType.UINT32, GGUFValueType.INT32):
fout.write(struct.pack("<I", elem))
elif sub_type == GGUFValueType.FLOAT32:
fout.write(struct.pack("<f", elem))
elif sub_type in (GGUFValueType.UINT64, GGUFValueType.INT64):
fout.write(struct.pack("<Q", elem))
elif sub_type == GGUFValueType.FLOAT64:
fout.write(struct.pack("<d", elem))
def main() -> None:
if len(sys.argv) != 4:
print(
f"Usage: {sys.argv[0]} <target.gguf> <source.gguf> <output.gguf>",
file=sys.stderr,
)
sys.exit(1)
target_path, source_path, output_path = sys.argv[1], sys.argv[2], sys.argv[3]
# ------------------------------------------------------------------
# 1. Open both files
# ------------------------------------------------------------------
print(f"Reading target: {target_path}")
target_reader = GGUFReader(target_path)
print(f"Reading source: {source_path}")
source_reader = GGUFReader(source_path)
target_file_size = Path(target_path).stat().st_size
source_file_size = Path(source_path).stat().st_size
print(
f" Target tensors: {len(target_reader.tensors)}, KVs: {len([k for k in target_reader.fields if not k.startswith('GGUF.')])}"
)
print(
f" Source tensors: {len(source_reader.tensors)}, KVs: {len([k for k in source_reader.fields if not k.startswith('GGUF.')])}"
)
# ------------------------------------------------------------------
# 2. Read architecture and MTP metadata from source
# ------------------------------------------------------------------
arch = get_field_value(target_reader, "general.architecture")
if arch is None:
print("ERROR: Target GGUF has no general.architecture key")
sys.exit(1)
source_block_count = get_field_value(source_reader, f"{arch}.block_count")
source_nextn = get_field_value(source_reader, f"{arch}.nextn_predict_layers")
if source_nextn is None:
print("ERROR: Source GGUF has no nextn_predict_layers key")
sys.exit(1)
target_block_count = get_field_value(target_reader, f"{arch}.block_count")
print(f"\n Arch: {arch}")
print(f" Target block_count: {target_block_count}")
print(
f" Source block_count: {source_block_count}, nextn_predict_layers: {source_nextn}"
)
# Identify extra tensors in the source (blocks beyond target's count)
source_extra = [
t
for t in source_reader.tensors
if t.name.startswith(f"blk.{target_block_count}.")
]
print(f"\n Extra tensors to transplant: {len(source_extra)}")
if not source_extra:
print(
f"ERROR: No tensors found with prefix 'blk.{target_block_count}.' in source"
)
sys.exit(1)
# ------------------------------------------------------------------
# 3. Prepare tensor lists and calculate sizes
# ------------------------------------------------------------------
# Combine tensors: all from target + extra from source
all_tensors = list(target_reader.tensors) + source_extra
# Calculate on-disk sizes for source tensors (including per-row metadata)
target_on_disk_sizes = calculate_on_disk_sizes(
target_reader.tensors, target_file_size
)
source_on_disk_sizes = calculate_on_disk_sizes(
source_reader.tensors, source_file_size
)
# Create mapping for source tensors
source_tensor_map = {
t.name: (t, size)
for t, size in zip(source_reader.tensors, source_on_disk_sizes)
}
# ------------------------------------------------------------------
# 4. Write output file
# ------------------------------------------------------------------
print(f"\nWriting output: {output_path}")
with (
open(target_path, "rb") as target_fin,
open(source_path, "rb") as source_fin,
open(output_path, "wb") as fout,
):
# 4.1 Write header
# Magic (4 bytes)
fout.write(b"GGUF")
# Version (4 bytes)
fout.write(struct.pack("<I", 3))
# Tensor count (8 bytes)
fout.write(struct.pack("<Q", len(all_tensors)))
# Calculate KV count
kv_count = len(
[k for k in target_reader.fields.keys() if not k.startswith("GGUF.")]
)
kv_count += 1 # block_count override
# Add source-only KVs (excluding block_count and nextn_predict_layers)
for key in source_reader.fields:
if (
not key.startswith("GGUF.")
and key not in target_reader.fields
and key != f"{arch}.block_count"
and key != f"{arch}.nextn_predict_layers"
):
kv_count += 1
# KV count (8 bytes)
fout.write(struct.pack("<Q", kv_count))
# 4.2 Write KV data from target (with block_count override)
written_keys = set()
for key, field in target_reader.fields.items():
if key.startswith("GGUF."):
continue
# Skip block_count (we'll override it)
if key == f"{arch}.block_count":
continue
# Write key
key_bytes = key.encode("utf-8")
fout.write(struct.pack("<Q", len(key_bytes)))
fout.write(key_bytes)
# Write type
kv_type = field.types[0]
fout.write(struct.pack("<I", int(kv_type)))
# Write value
if kv_type == GGUFValueType.STRING:
write_kv_value(fout, kv_type, field.contents())
elif kv_type == GGUFValueType.ARRAY:
sub_type = (
field.types[1] if len(field.types) > 1 else GGUFValueType.FLOAT32
)
write_array_value(fout, sub_type, field.contents())
else:
write_kv_value(fout, kv_type, field.contents())
written_keys.add(key)
# Add block_count from source
key = f"{arch}.block_count"
key_bytes = key.encode("utf-8")
fout.write(struct.pack("<Q", len(key_bytes)))
fout.write(key_bytes)
fout.write(struct.pack("<I", int(GGUFValueType.UINT32)))
fout.write(struct.pack("<I", source_block_count))
written_keys.add(key)
# Add nextn_predict_layers from source
key = f"{arch}.nextn_predict_layers"
key_bytes = key.encode("utf-8")
fout.write(struct.pack("<Q", len(key_bytes)))
fout.write(key_bytes)
fout.write(struct.pack("<I", int(GGUFValueType.UINT32)))
fout.write(struct.pack("<I", source_nextn))
written_keys.add(key)
# Copy source-only KVs
for key, field in source_reader.fields.items():
if (
key.startswith("GGUF.")
or key in written_keys
or key == f"{arch}.nextn_predict_layers"
):
continue
# Write key
key_bytes = key.encode("utf-8")
fout.write(struct.pack("<Q", len(key_bytes)))
fout.write(key_bytes)
# Write type
kv_type = field.types[0]
fout.write(struct.pack("<I", int(kv_type)))
# Write value
if kv_type == GGUFValueType.STRING:
write_kv_value(fout, kv_type, field.contents())
elif kv_type == GGUFValueType.ARRAY:
sub_type = (
field.types[1] if len(field.types) > 1 else GGUFValueType.FLOAT32
)
write_array_value(fout, sub_type, field.contents())
else:
write_kv_value(fout, kv_type, field.contents())
# 4.3 Write tensor info
# Calculate offsets for all tensors
current_offset = 0
tensor_offsets = []
for i, tensor in enumerate(all_tensors):
if i < len(target_reader.tensors):
size = target_on_disk_sizes[i]
else:
_, size = source_tensor_map[tensor.name]
tensor_offsets.append(current_offset)
current_offset += size
# Write tensor info for each tensor
for i, tensor in enumerate(all_tensors):
# Tensor name
name_bytes = tensor.name.encode("utf-8")
fout.write(struct.pack("<Q", len(name_bytes)))
fout.write(name_bytes)
# Dimensions (in GGUF file order: fastest-varying first)
shape = tensor.shape.tolist()
fout.write(struct.pack("<I", len(shape)))
for dim in shape:
fout.write(struct.pack("<Q", dim))
# Quantization type
fout.write(struct.pack("<I", int(tensor.tensor_type)))
# Offset
fout.write(struct.pack("<Q", tensor_offsets[i]))
# 4.4 Pad to alignment if needed
current_pos = fout.tell()
alignment = get_field_value(target_reader, "general.alignment") or 32
padding_needed = (alignment - (current_pos % alignment)) % alignment
if padding_needed:
fout.write(b"\x00" * padding_needed)
# 4.5 Copy tensor data
print(f"Copying {len(all_tensors)} tensors...")
for i, tensor in enumerate(all_tensors):
if i < len(target_reader.tensors):
# Target tensor
offset = target_reader.tensors[i].data_offset
size = target_on_disk_sizes[i]
fin = target_fin
else:
# Source extra tensor
src_tensor, size = source_tensor_map[tensor.name]
offset = src_tensor.data_offset
fin = source_fin
fin.seek(offset)
raw_data = fin.read(size)
fout.write(raw_data)
if (i + 1) % 50 == 0 or i == len(all_tensors) - 1:
print(f" Copied {i + 1}/{len(all_tensors)} tensors")
# ------------------------------------------------------------------
# 5. Verify output
# ------------------------------------------------------------------
output_size = Path(output_path).stat().st_size
print(f"\nOutput: {output_path}")
print(f" Size: {output_size / 1_000_000_000:.2f} GB")
print(f" Tensors: {len(all_tensors)}")
# Validate
print("\nValidating output...")
errors = []
try:
out_reader = GGUFReader(output_path)
# Check block_count
out_block_count = get_field_value(out_reader, f"{arch}.block_count")
if out_block_count != source_block_count:
errors.append(
f"block_count: expected {source_block_count}, got {out_block_count}"
)
# Check nextn_predict_layers
out_nextn = get_field_value(out_reader, f"{arch}.nextn_predict_layers")
if out_nextn != source_nextn:
errors.append(
f"nextn_predict_layers: expected {source_nextn}, got {out_nextn}"
)
# Check extra tensors exist
out_tensor_names = {t.name for t in out_reader.tensors}
for tensor in source_extra:
if tensor.name not in out_tensor_names:
errors.append(f"Missing tensor: {tensor.name}")
# Spot-check tensor data integrity
print(" Spot-checking tensor data integrity...")
out_tensors = {t.name: t for t in out_reader.tensors}
# Check a target tensor
for name in ["token_embd.weight"]:
if name in out_tensors and name in {t.name for t in target_reader.tensors}:
target_t = next(
(t for t in target_reader.tensors if t.name == name), None
)
out_t = out_tensors.get(name)
if target_t and out_t:
target_hash = hashlib.sha256(target_t.data.tobytes()).hexdigest()[
:16
]
out_hash = hashlib.sha256(out_t.data.tobytes()).hexdigest()[:16]
if target_hash == out_hash:
print(f" {name}: OK ({out_hash})")
else:
errors.append(f"Data mismatch: {name}")
# Check an extra tensor
if source_extra:
extra_name = source_extra[0].name
source_t = source_tensor_map[extra_name][0]
out_t = out_tensors.get(extra_name)
if out_t:
source_hash = hashlib.sha256(source_t.data.tobytes()).hexdigest()[:16]
out_hash = hashlib.sha256(out_t.data.tobytes()).hexdigest()[:16]
if source_hash == out_hash:
print(f" {extra_name}: OK ({out_hash})")
else:
errors.append(f"Data mismatch: {extra_name}")
except Exception as e:
errors.append(f"Failed to read output: {e}")
if errors:
print("\nVALIDATION FAILED:")
for err in errors:
print(f" - {err}")
sys.exit(1)
else:
print(" OK — all checks passed")
print(f"\nDone. Output: {output_path}")
if __name__ == "__main__":
main()
@resynth
Copy link
Copy Markdown

resynth commented May 3, 2026

Awesome, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment