Last active
April 25, 2022 20:35
-
-
Save yaoyaoding/3ad5850cb142f3b77855bfec284a71e0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List, Union | |
import json | |
from collections import defaultdict | |
class Model: | |
def __init__(self, graph, description="", author="", company="", license="", domain="", source=""): | |
self.graphs: List[Graph] = [graph] | |
self.description: str = description | |
self.author: str = author | |
self.company: str = company | |
self.license: str = license | |
self.domain: str = domain | |
self.source: str = source | |
self.format: str = 'netron' | |
def export(self): | |
return { | |
'graphs': [graph.export() for graph in self.graphs], | |
'description': self.description, | |
'author': self.author, | |
'company': self.company, | |
'license': self.license, | |
'domain': self.domain, | |
'source': self.source, | |
'format': self.format | |
} | |
class Graph: | |
def __init__(self, inputs, outputs, nodes, name=""): | |
self.inputs: List[Parameter] = inputs | |
self.outputs: List[Parameter] = outputs | |
self.nodes: List[Node] = nodes | |
self.name: str = name | |
def export(self): | |
return { | |
'name': self.name, | |
'inputs': [param.export() for param in self.inputs], | |
'outputs': [param.export() for param in self.outputs], | |
'nodes': [node.export() for node in self.nodes] | |
} | |
class Parameter: | |
def __init__(self, name, argument, visible=True): | |
self.name: str = name | |
self.arguments: List[Argument] = [argument] | |
self.visible: bool = visible | |
def export(self): | |
return { | |
'name': self.name, | |
'arguments': [arg.export() for arg in self.arguments], | |
'visible': self.visible | |
} | |
class Argument: | |
def __init__(self, name, data_type, shape: List[int], has_initializer=False, scalar_value=None): | |
self.name: str = name | |
self.data_type: str = data_type | |
self.shape: List[int] = shape | |
self.has_initializer: bool = has_initializer | |
self.scalar_value = scalar_value | |
def export(self): | |
ret = { | |
'name': self.name, | |
'type': { | |
"string": '{}{}'.format(self.data_type, self.shape), | |
"shape": {'dimensions': self.shape}, | |
"dataType": self.data_type | |
} | |
} | |
if self.has_initializer: | |
ret['initializer'] = {'kind': 'Initializer'} | |
if len(self.shape) == 0 and self.scalar_value is not None: | |
ret['initializer']['value'] = str(self.scalar_value) | |
else: | |
ret['initializer']['value'] = '<>' | |
return ret | |
class Node: | |
# category influence the color in netron | |
categories = { | |
'layer': ['Conv2d'], | |
'constant': [], | |
'activation': ['Relu'], | |
'pool': ['MaxPool2d', 'AvgPool2d'], | |
'normalization': [], | |
'dropout': [], | |
'transform': ['Squeeze', 'Unsqueeze', 'Add', 'Sub', 'Multiply', 'Rsqrt'], | |
'custom': [], | |
} | |
def __init__(self, name, type_name, inputs, outputs, attributes, category=None, description=''): | |
self.name: str = name | |
self.type_name: str = type_name | |
self.inputs: List[Parameter] = inputs | |
self.outputs: List[Parameter] = outputs | |
self.attributes: List[Attribute] = attributes | |
self.description: Union[List[str], str] = description.split('\n') | |
self.category = category | |
if self.category is None: | |
if self.type_name.startswith('Fused'): | |
self.category = 'dropout' | |
else: | |
for cat, ops in self.categories.items(): | |
if type_name in ops: | |
self.category = cat | |
break | |
def export(self): | |
return { | |
'name': self.name, | |
'type': { | |
'name': self.type_name, | |
'category': self.category | |
}, | |
'inputs': [param.export() for param in self.inputs], | |
'outputs': [param.export() for param in self.outputs], | |
'attributes': [attr.export() for attr in self.attributes], | |
'description': self.description | |
} | |
class Attribute: | |
def __init__(self, name, type_name: str, value: str, visible=True, description=""): | |
self.name: str = name | |
self.type_name: str = type_name | |
self.value: str = value | |
self.visible: bool = visible | |
self.description: str = description | |
def export(self): | |
return { | |
'name': self.name, | |
'type': self.type_name, | |
'value': self.value, | |
'visible': self.visible, | |
'description': self.description | |
} | |
def type_string_of(value): | |
if isinstance(value, (list, tuple)): | |
if len(value) > 0: | |
return 'Sequence[{}]'.format(type(value[0]).__name__) | |
else: | |
return 'Sequence[]' | |
else: | |
return str(type(value).__name__) | |
def dump(flow_graph, fp): | |
""" | |
FlowGraph is a computation graph in hidet, which is irrelevant to this format, feel free to ignore it. | |
The following code is used to convert the computation graph in hidet to this format for visualization. | |
""" | |
from hidet import FlowGraph | |
assert isinstance(flow_graph, FlowGraph) | |
flow_graph.update_nodes() | |
tensor2argument = {} | |
node2idx = defaultdict(int) | |
inputs = [] | |
outputs = [] | |
nodes = [] | |
for idx, tensor in enumerate(flow_graph.inputs): | |
name = 'input:{}'.format(idx) | |
argument = Argument(name, data_type=tensor.dtype, shape=tensor.shape, has_initializer=False) | |
tensor2argument[tensor] = argument | |
inputs.append(Parameter(name, argument)) | |
constant_cnt = 0 | |
for node in flow_graph.nodes: | |
node_type = node.name | |
node2idx[node_type] += 1 | |
node_name = '{}{}'.format(node_type, node2idx[node_type]) | |
for idx, tensor in enumerate(node.inputs): | |
if tensor.storage is None: # not a constant | |
continue | |
if tensor in tensor2argument: # constant shared by multiple nodes | |
continue | |
name = 'const:{}'.format(constant_cnt) | |
constant_cnt += 1 | |
scalar_value = str(tensor.cpu().numpy()) if len(tensor.shape) == 0 and tensor.storage else None | |
tensor2argument[tensor] = Argument(name, data_type=tensor.dtype, shape=tensor.shape, has_initializer=True, scalar_value=scalar_value) | |
for idx, tensor in enumerate(node.outputs): | |
name = '{}:{}'.format(node_name, idx) | |
tensor2argument[tensor] = Argument(name, data_type=tensor.dtype, shape=tensor.shape, has_initializer=False) | |
nodes.append(Node( | |
name=node_name, | |
type_name=node_type, | |
inputs=[Parameter(str(idx), tensor2argument[tensor]) for idx, tensor in enumerate(node.inputs)], | |
outputs=[Parameter(str(idx), tensor2argument[tensor]) for idx, tensor in enumerate(node.outputs)], | |
attributes=[ | |
Attribute(name, type_string_of(value), str(value)) for name, value in node.attributes.items() | |
], | |
description="{}".format(str(node.task)) | |
)) | |
for idx, tensor in enumerate(flow_graph.outputs): | |
outputs.append(Parameter('output:{}'.format(idx), tensor2argument[tensor])) | |
graph = Graph(inputs, outputs, nodes, name="") | |
model = Model(graph, source='Hidet', description='Converted from FlowGraph') | |
json.dump(model.export(), fp, indent=2) | |
def demo(): | |
""" | |
Basically, you can think argument is a Tensor and node is an operator. | |
Each operator has inputs, outputs, and attributes. The name of parameter is the name corresponding to the operator itself. | |
Each parameter is associated with an argument (i.e., Tensor). | |
The name of argument is the name of the tensor, corresponding to an edge in the computation graph. | |
If the argument is a weight (the tensor whose value is known before execution), we should let 'has_initializer=True' as 'weight:0' in the example. | |
""" | |
inputs = [] | |
outputs = [] | |
nodes = [] | |
name2arguments = {} | |
name2arguments['input:0'] = Argument('input:0', data_type='data_type_of_input', shape=[3, 4]) | |
inputs.append(Parameter('input:0', name2arguments['input:0'])) | |
name2arguments['input:1'] = Argument('input:1', data_type='float32', shape=[]) | |
inputs.append(Parameter('input:1', name2arguments['input:1'])) | |
name2arguments['Add0:0'] = Argument('Add0:0', data_type='float32', shape=[3, 4]) | |
add0 = Node( | |
'Add0', | |
type_name='Add', | |
inputs=[ | |
Parameter('lhs', name2arguments['input:0']), | |
Parameter('rhs', name2arguments['input:1']) | |
], | |
outputs=[ | |
Parameter('output', name2arguments['Add0:0']) | |
], | |
attributes=[ | |
Attribute('AttributeName', 'AttributeTypeName', 'AttributeValue') | |
] | |
) | |
name2arguments['weight:0'] = Argument('weight:0', data_type='weight_dtype', shape=[3, 4], has_initializer=True) | |
name2arguments['Add1:0'] = Argument('Add1:0', data_type='float32', shape=[3, 4]) | |
add1 = Node( | |
'Add1', | |
type_name='Add', | |
inputs=[ | |
Parameter('lhs', name2arguments['Add0:0']), | |
Parameter('rhs', name2arguments['weight:0']) | |
], | |
outputs=[ | |
Parameter('output', name2arguments['Add1:0']) | |
], | |
attributes=[ | |
Attribute('AttributeName', 'AttributeTypeName', 'AttributeValue') | |
] | |
) | |
nodes.append(add0) | |
nodes.append(add1) | |
outputs.append(Parameter('graph_output', name2arguments['Add1:0'])) | |
graph = Graph(inputs, outputs, nodes, name='graph_name') | |
model = Model(graph, source='hand-written', description='A demo to use this format.') | |
with open('model_sample.json', 'w') as f: | |
json.dump(model.export(), f, indent=2) | |
if __name__ == '__main__': | |
demo() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment