Last active
May 10, 2026 11:45
-
-
Save buzz/1c439684d5e3f36492ae9f64ef7e3f67 to your computer and use it in GitHub Desktop.
Transplant MTP block from one GGUF file into another
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Transplant extra tensors (e.g. MTP layers) from one GGUF file into another, | |
| producing a mixed-quantization GGUF. | |
| Note: Tested with ik_llama.cpp GGUF Python module. | |
| Usage: | |
| python convert.py <target.gguf> <source.gguf> <output.gguf> | |
| Arguments: | |
| target — base GGUF (tensors + metadata kept as-is) | |
| source — GGUF with extra blocks to transplant (e.g. blk.64.* for MTP) | |
| output — resulting mixed-quantization GGUF | |
| The script preserves the exact on-disk layout including per-row metadata | |
| for quantization types like IQ4_KS that have row_meta_size > 0. This is | |
| critical for GPU inference to work correctly. | |
| Donor Models: | |
| To save bandwidth, you can use 'tensors-only' GGUFs as the source file. | |
| Credits to AzerbaijanNyan for the extraction: | |
| https://www.reddit.com/r/LocalLLaMA/comments/1t6r1ny/extracted_mtp_tensor_ggufs_smaller_donor_models/ | |
| Available donors: | |
| - Qwen3.6-35A3B: https://huggingface.co/IHaveNoClueAndIMustPost/Qwen3.6-35A3B-MTP-TENSORS-ONLY | |
| - Qwen3.6-27b: https://huggingface.co/IHaveNoClueAndIMustPost/Qwen3.6-27b-MTP-TENSORS-ONLY | |
| Example: | |
| # Transplant MTP block from Q8_0 into IQ4_KS base model | |
| python convert.py Qwen3.6-27B-IQ4_KS.gguf Qwen3.6-27B-MTP-Q8_0.gguf Qwen3.6-27B-MTP-IQ4_KS.gguf | |
| """ | |
| import hashlib | |
| import sys | |
| import struct | |
| from pathlib import Path | |
| from gguf import GGUFReader, GGUFValueType | |
| def get_field_value(reader: GGUFReader, key: str): | |
| """Safely get a field value from GGUFReader.""" | |
| field = reader.get_field(key) | |
| return field.contents() if field else None | |
| def calculate_on_disk_sizes(tensors, file_size): | |
| """Calculate on-disk size for each tensor (including per-row metadata/padding).""" | |
| n_tensors = len(tensors) | |
| sizes = [] | |
| for i in range(n_tensors): | |
| if i < n_tensors - 1: | |
| sizes.append(tensors[i + 1].data_offset - tensors[i].data_offset) | |
| else: | |
| sizes.append(file_size - tensors[i].data_offset) | |
| return sizes | |
| def write_kv_value(fout, kv_type, value): | |
| """Write a KV value to the output file.""" | |
| if kv_type == GGUFValueType.STRING: | |
| value_bytes = value.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(value_bytes))) | |
| fout.write(value_bytes) | |
| elif kv_type == GGUFValueType.ARRAY: | |
| # This is handled separately in the main code | |
| pass | |
| elif kv_type in (GGUFValueType.UINT8, GGUFValueType.INT8, GGUFValueType.BOOL): | |
| fout.write(struct.pack("<B", value)) | |
| elif kv_type in (GGUFValueType.UINT16, GGUFValueType.INT16): | |
| fout.write(struct.pack("<H", value)) | |
| elif kv_type in (GGUFValueType.UINT32, GGUFValueType.INT32): | |
| fout.write(struct.pack("<I", value)) | |
| elif kv_type == GGUFValueType.FLOAT32: | |
| fout.write(struct.pack("<f", value)) | |
| elif kv_type in (GGUFValueType.UINT64, GGUFValueType.INT64): | |
| fout.write(struct.pack("<Q", value)) | |
| elif kv_type == GGUFValueType.FLOAT64: | |
| fout.write(struct.pack("<d", value)) | |
| def write_array_value(fout, sub_type, arr): | |
| """Write an array KV value to the output file.""" | |
| fout.write(struct.pack("<I", int(sub_type))) | |
| fout.write(struct.pack("<Q", len(arr))) | |
| for elem in arr: | |
| if sub_type == GGUFValueType.STRING: | |
| elem_bytes = elem.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(elem_bytes))) | |
| fout.write(elem_bytes) | |
| elif sub_type in (GGUFValueType.UINT8, GGUFValueType.INT8, GGUFValueType.BOOL): | |
| fout.write(struct.pack("<B", elem)) | |
| elif sub_type in (GGUFValueType.UINT16, GGUFValueType.INT16): | |
| fout.write(struct.pack("<H", elem)) | |
| elif sub_type in (GGUFValueType.UINT32, GGUFValueType.INT32): | |
| fout.write(struct.pack("<I", elem)) | |
| elif sub_type == GGUFValueType.FLOAT32: | |
| fout.write(struct.pack("<f", elem)) | |
| elif sub_type in (GGUFValueType.UINT64, GGUFValueType.INT64): | |
| fout.write(struct.pack("<Q", elem)) | |
| elif sub_type == GGUFValueType.FLOAT64: | |
| fout.write(struct.pack("<d", elem)) | |
| def main() -> None: | |
| if len(sys.argv) != 4: | |
| print( | |
| f"Usage: {sys.argv[0]} <target.gguf> <source.gguf> <output.gguf>", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(1) | |
| target_path, source_path, output_path = sys.argv[1], sys.argv[2], sys.argv[3] | |
| # ------------------------------------------------------------------ | |
| # 1. Open both files | |
| # ------------------------------------------------------------------ | |
| print(f"Reading target: {target_path}") | |
| target_reader = GGUFReader(target_path) | |
| print(f"Reading source: {source_path}") | |
| source_reader = GGUFReader(source_path) | |
| target_file_size = Path(target_path).stat().st_size | |
| source_file_size = Path(source_path).stat().st_size | |
| print( | |
| f" Target tensors: {len(target_reader.tensors)}, KVs: {len([k for k in target_reader.fields if not k.startswith('GGUF.')])}" | |
| ) | |
| print( | |
| f" Source tensors: {len(source_reader.tensors)}, KVs: {len([k for k in source_reader.fields if not k.startswith('GGUF.')])}" | |
| ) | |
| # ------------------------------------------------------------------ | |
| # 2. Read architecture and MTP metadata from source | |
| # ------------------------------------------------------------------ | |
| arch = get_field_value(target_reader, "general.architecture") | |
| if arch is None: | |
| print("ERROR: Target GGUF has no general.architecture key") | |
| sys.exit(1) | |
| source_block_count = get_field_value(source_reader, f"{arch}.block_count") | |
| source_nextn = get_field_value(source_reader, f"{arch}.nextn_predict_layers") | |
| if source_nextn is None: | |
| print("ERROR: Source GGUF has no nextn_predict_layers key") | |
| sys.exit(1) | |
| target_block_count = get_field_value(target_reader, f"{arch}.block_count") | |
| print(f"\n Arch: {arch}") | |
| print(f" Target block_count: {target_block_count}") | |
| print( | |
| f" Source block_count: {source_block_count}, nextn_predict_layers: {source_nextn}" | |
| ) | |
| # Identify extra tensors in the source (blocks beyond target's count) | |
| source_extra = [ | |
| t | |
| for t in source_reader.tensors | |
| if t.name.startswith(f"blk.{target_block_count}.") | |
| ] | |
| print(f"\n Extra tensors to transplant: {len(source_extra)}") | |
| if not source_extra: | |
| print( | |
| f"ERROR: No tensors found with prefix 'blk.{target_block_count}.' in source" | |
| ) | |
| sys.exit(1) | |
| # ------------------------------------------------------------------ | |
| # 3. Prepare tensor lists and calculate sizes | |
| # ------------------------------------------------------------------ | |
| # Combine tensors: all from target + extra from source | |
| all_tensors = list(target_reader.tensors) + source_extra | |
| # Calculate on-disk sizes for source tensors (including per-row metadata) | |
| target_on_disk_sizes = calculate_on_disk_sizes( | |
| target_reader.tensors, target_file_size | |
| ) | |
| source_on_disk_sizes = calculate_on_disk_sizes( | |
| source_reader.tensors, source_file_size | |
| ) | |
| # Create mapping for source tensors | |
| source_tensor_map = { | |
| t.name: (t, size) | |
| for t, size in zip(source_reader.tensors, source_on_disk_sizes) | |
| } | |
| # ------------------------------------------------------------------ | |
| # 4. Write output file | |
| # ------------------------------------------------------------------ | |
| print(f"\nWriting output: {output_path}") | |
| with ( | |
| open(target_path, "rb") as target_fin, | |
| open(source_path, "rb") as source_fin, | |
| open(output_path, "wb") as fout, | |
| ): | |
| # 4.1 Write header | |
| # Magic (4 bytes) | |
| fout.write(b"GGUF") | |
| # Version (4 bytes) | |
| fout.write(struct.pack("<I", 3)) | |
| # Tensor count (8 bytes) | |
| fout.write(struct.pack("<Q", len(all_tensors))) | |
| # Calculate KV count | |
| kv_count = len( | |
| [k for k in target_reader.fields.keys() if not k.startswith("GGUF.")] | |
| ) | |
| kv_count += 1 # block_count override | |
| # Add source-only KVs (excluding block_count and nextn_predict_layers) | |
| for key in source_reader.fields: | |
| if ( | |
| not key.startswith("GGUF.") | |
| and key not in target_reader.fields | |
| and key != f"{arch}.block_count" | |
| and key != f"{arch}.nextn_predict_layers" | |
| ): | |
| kv_count += 1 | |
| # KV count (8 bytes) | |
| fout.write(struct.pack("<Q", kv_count)) | |
| # 4.2 Write KV data from target (with block_count override) | |
| written_keys = set() | |
| for key, field in target_reader.fields.items(): | |
| if key.startswith("GGUF."): | |
| continue | |
| # Skip block_count (we'll override it) | |
| if key == f"{arch}.block_count": | |
| continue | |
| # Write key | |
| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |
| # Write type | |
| kv_type = field.types[0] | |
| fout.write(struct.pack("<I", int(kv_type))) | |
| # Write value | |
| if kv_type == GGUFValueType.STRING: | |
| write_kv_value(fout, kv_type, field.contents()) | |
| elif kv_type == GGUFValueType.ARRAY: | |
| sub_type = ( | |
| field.types[1] if len(field.types) > 1 else GGUFValueType.FLOAT32 | |
| ) | |
| write_array_value(fout, sub_type, field.contents()) | |
| else: | |
| write_kv_value(fout, kv_type, field.contents()) | |
| written_keys.add(key) | |
| # Add block_count from source | |
| key = f"{arch}.block_count" | |
| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |
| fout.write(struct.pack("<I", int(GGUFValueType.UINT32))) | |
| fout.write(struct.pack("<I", source_block_count)) | |
| written_keys.add(key) | |
| # Add nextn_predict_layers from source | |
| key = f"{arch}.nextn_predict_layers" | |
| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |
| fout.write(struct.pack("<I", int(GGUFValueType.UINT32))) | |
| fout.write(struct.pack("<I", source_nextn)) | |
| written_keys.add(key) | |
| # Copy source-only KVs | |
| for key, field in source_reader.fields.items(): | |
| if ( | |
| key.startswith("GGUF.") | |
| or key in written_keys | |
| or key == f"{arch}.nextn_predict_layers" | |
| ): | |
| continue | |
| # Write key | |
| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |
| # Write type | |
| kv_type = field.types[0] | |
| fout.write(struct.pack("<I", int(kv_type))) | |
| # Write value | |
| if kv_type == GGUFValueType.STRING: | |
| write_kv_value(fout, kv_type, field.contents()) | |
| elif kv_type == GGUFValueType.ARRAY: | |
| sub_type = ( | |
| field.types[1] if len(field.types) > 1 else GGUFValueType.FLOAT32 | |
| ) | |
| write_array_value(fout, sub_type, field.contents()) | |
| else: | |
| write_kv_value(fout, kv_type, field.contents()) | |
| # 4.3 Write tensor info | |
| # Calculate offsets for all tensors | |
| current_offset = 0 | |
| tensor_offsets = [] | |
| for i, tensor in enumerate(all_tensors): | |
| if i < len(target_reader.tensors): | |
| size = target_on_disk_sizes[i] | |
| else: | |
| _, size = source_tensor_map[tensor.name] | |
| tensor_offsets.append(current_offset) | |
| current_offset += size | |
| # Write tensor info for each tensor | |
| for i, tensor in enumerate(all_tensors): | |
| # Tensor name | |
| name_bytes = tensor.name.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(name_bytes))) | |
| fout.write(name_bytes) | |
| # Dimensions (in GGUF file order: fastest-varying first) | |
| shape = tensor.shape.tolist() | |
| fout.write(struct.pack("<I", len(shape))) | |
| for dim in shape: | |
| fout.write(struct.pack("<Q", dim)) | |
| # Quantization type | |
| fout.write(struct.pack("<I", int(tensor.tensor_type))) | |
| # Offset | |
| fout.write(struct.pack("<Q", tensor_offsets[i])) | |
| # 4.4 Pad to alignment if needed | |
| current_pos = fout.tell() | |
| alignment = get_field_value(target_reader, "general.alignment") or 32 | |
| padding_needed = (alignment - (current_pos % alignment)) % alignment | |
| if padding_needed: | |
| fout.write(b"\x00" * padding_needed) | |
| # 4.5 Copy tensor data | |
| print(f"Copying {len(all_tensors)} tensors...") | |
| for i, tensor in enumerate(all_tensors): | |
| if i < len(target_reader.tensors): | |
| # Target tensor | |
| offset = target_reader.tensors[i].data_offset | |
| size = target_on_disk_sizes[i] | |
| fin = target_fin | |
| else: | |
| # Source extra tensor | |
| src_tensor, size = source_tensor_map[tensor.name] | |
| offset = src_tensor.data_offset | |
| fin = source_fin | |
| fin.seek(offset) | |
| raw_data = fin.read(size) | |
| fout.write(raw_data) | |
| if (i + 1) % 50 == 0 or i == len(all_tensors) - 1: | |
| print(f" Copied {i + 1}/{len(all_tensors)} tensors") | |
| # ------------------------------------------------------------------ | |
| # 5. Verify output | |
| # ------------------------------------------------------------------ | |
| output_size = Path(output_path).stat().st_size | |
| print(f"\nOutput: {output_path}") | |
| print(f" Size: {output_size / 1_000_000_000:.2f} GB") | |
| print(f" Tensors: {len(all_tensors)}") | |
| # Validate | |
| print("\nValidating output...") | |
| errors = [] | |
| try: | |
| out_reader = GGUFReader(output_path) | |
| # Check block_count | |
| out_block_count = get_field_value(out_reader, f"{arch}.block_count") | |
| if out_block_count != source_block_count: | |
| errors.append( | |
| f"block_count: expected {source_block_count}, got {out_block_count}" | |
| ) | |
| # Check nextn_predict_layers | |
| out_nextn = get_field_value(out_reader, f"{arch}.nextn_predict_layers") | |
| if out_nextn != source_nextn: | |
| errors.append( | |
| f"nextn_predict_layers: expected {source_nextn}, got {out_nextn}" | |
| ) | |
| # Check extra tensors exist | |
| out_tensor_names = {t.name for t in out_reader.tensors} | |
| for tensor in source_extra: | |
| if tensor.name not in out_tensor_names: | |
| errors.append(f"Missing tensor: {tensor.name}") | |
| # Spot-check tensor data integrity | |
| print(" Spot-checking tensor data integrity...") | |
| out_tensors = {t.name: t for t in out_reader.tensors} | |
| # Check a target tensor | |
| for name in ["token_embd.weight"]: | |
| if name in out_tensors and name in {t.name for t in target_reader.tensors}: | |
| target_t = next( | |
| (t for t in target_reader.tensors if t.name == name), None | |
| ) | |
| out_t = out_tensors.get(name) | |
| if target_t and out_t: | |
| target_hash = hashlib.sha256(target_t.data.tobytes()).hexdigest()[ | |
| :16 | |
| ] | |
| out_hash = hashlib.sha256(out_t.data.tobytes()).hexdigest()[:16] | |
| if target_hash == out_hash: | |
| print(f" {name}: OK ({out_hash})") | |
| else: | |
| errors.append(f"Data mismatch: {name}") | |
| # Check an extra tensor | |
| if source_extra: | |
| extra_name = source_extra[0].name | |
| source_t = source_tensor_map[extra_name][0] | |
| out_t = out_tensors.get(extra_name) | |
| if out_t: | |
| source_hash = hashlib.sha256(source_t.data.tobytes()).hexdigest()[:16] | |
| out_hash = hashlib.sha256(out_t.data.tobytes()).hexdigest()[:16] | |
| if source_hash == out_hash: | |
| print(f" {extra_name}: OK ({out_hash})") | |
| else: | |
| errors.append(f"Data mismatch: {extra_name}") | |
| except Exception as e: | |
| errors.append(f"Failed to read output: {e}") | |
| if errors: | |
| print("\nVALIDATION FAILED:") | |
| for err in errors: | |
| print(f" - {err}") | |
| sys.exit(1) | |
| else: | |
| print(" OK — all checks passed") | |
| print(f"\nDone. Output: {output_path}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Awesome, thanks!