|
import coremltools as ct |
|
import sys |
|
from dataclasses import dataclass |
|
|
|
""" |
|
Simple script to expose an intermediate tensor |
|
as an output of a CoreML model. |
|
|
|
Useful for debugging without modifying the source |
|
model and re-converting it repeatedly. |
|
|
|
Usage: python add_debug_output.py [path to mlpackage] [output_name1] [output_name2] ... |
|
Open the model in https://netron.app to find an output name. |
|
""" |
|
|
|
@dataclass |
|
class KeyedOperation: |
|
function_name: str |
|
block_specialization_name: str |
|
operation: any |
|
|
|
def walk_block(block): |
|
for operation in block.operations: |
|
yield operation |
|
# todo: Walk operation.blocks recursively to support loops and conditionals. |
|
|
|
def walk_spec(spec): |
|
for functionName, function in spec.mlProgram.functions.items(): |
|
for blockSpecializationName, block in function.block_specializations.items(): |
|
for operation in walk_block(block): |
|
yield KeyedOperation(functionName, blockSpecializationName, operation) |
|
|
|
def value_type_to_feature_type(value_type): |
|
multi_array = ct.proto.FeatureTypes_pb2.ArrayFeatureType() |
|
|
|
shape = [d.constant.size for d in value_type.tensorType.dimensions] |
|
multi_array.shape.extend(shape) |
|
|
|
if value_type.tensorType.dataType == ct.proto.MIL_pb2.DataType.FLOAT16: |
|
multi_array.dataType = ct.proto.FeatureTypes_pb2.ArrayFeatureType.ArrayDataType.FLOAT16 |
|
elif value_type.tensorType.dataType == ct.proto.MIL_pb2.DataType.FLOAT32: |
|
multi_array.dataType = ct.proto.FeatureTypes_pb2.ArrayFeatureType.ArrayDataType.FLOAT32 |
|
elif value_type.tensorType.dataType == ct.proto.MIL_pb2.DataType.INT32: |
|
multi_array.dataType = ct.proto.FeatureTypes_pb2.ArrayFeatureType.ArrayDataType.INT32 |
|
else: |
|
raise ValueError(f"Unsupported output data type: {ct.proto.MIL_pb2.DataType.Name(value_type.tensorType.dataType)}") |
|
|
|
feat_type = ct.proto.Model_pb2.FeatureType() |
|
feat_type.multiArrayType.CopyFrom(multi_array) |
|
return feat_type |
|
|
|
if len(sys.argv) < 3: |
|
print(f"Usage: python {sys.argv[0]} [path to mlpackage] [output_name1] [output_name2] ...") |
|
sys.exit(1) |
|
|
|
model_path = sys.argv[1] |
|
new_output_names = sys.argv[2:] |
|
|
|
# Example of how to browse the spec. |
|
# protoc --decode CoreML.Specification.Model -I ~/path/to/coremltools/mlmodel/format/ ~/path/to/coremltools/mlmodel/format/Model.proto < /path/to/your.mlpackage/Data/com.apple.CoreML/model.mlmodel | less |
|
|
|
model = ct.models.MLModel(model_path, skip_model_load=True) |
|
spec = model.get_spec() |
|
|
|
added = [] |
|
original_output_names = [o.name for o in spec.description.output] |
|
|
|
already_output_names = set(original_output_names) & set(new_output_names) |
|
new_output_names = list(set(new_output_names) - already_output_names) |
|
if len(already_output_names) > 0: |
|
if len(new_output_names) == 0: |
|
print(f"All requested names are already outputs, nothing to do.") |
|
sys.exit(1) |
|
else: |
|
print(f"Some requested names are already outputs, skipping them: {list(already_output_names)}") |
|
|
|
for keyed_op in walk_spec(spec): |
|
for output in keyed_op.operation.outputs: |
|
if output.name in new_output_names: |
|
spec.mlProgram.functions[keyed_op.function_name].block_specializations[keyed_op.block_specialization_name].outputs.extend([output.name]) |
|
|
|
feat_desc = ct.proto.Model_pb2.FeatureDescription() |
|
feat_desc.name = output.name |
|
feat_desc.type.CopyFrom(value_type_to_feature_type(output.type)) |
|
spec.description.output.extend([feat_desc]) |
|
|
|
added.append(output.name) |
|
|
|
if len(added) == 0: |
|
print(f"Found none of these outputs in the model: {new_output_names}") |
|
print("Exiting without saving.") |
|
sys.exit(1) |
|
|
|
debug_model = ct.models.MLModel(spec, weights_dir=model.weights_dir) |
|
output_path = model_path.replace(".mlpackage", "_debug.mlpackage") |
|
debug_model.save(output_path) |
|
|
|
print("Added outputs : ", added) |
|
print("Saved to : ", output_path) |