Last active
April 13, 2023 22:41
-
-
Save vuiseng9/d049e5cc87078f9d68e48784121ac22e to your computer and use it in GitHub Desktop.
tranformer_block_tracing via NNCF
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
from typing import Dict, Callable, Any, Union, List, Tuple | |
import torch | |
import torch.nn as nn | |
from torch.utils.data import DataLoader | |
from torch.utils.data import Dataset | |
from nncf.torch.nncf_network import NNCFNetwork | |
from nncf.torch.dynamic_graph.graph_tracer import create_input_infos, create_dummy_forward_fn | |
from nncf.torch import create_compressed_model | |
from nncf import NNCFConfig | |
from nncf.torch.initialization import register_default_init_args | |
from abc import ABC, abstractmethod | |
from nncf.experimental.torch.search_building_blocks.search_blocks import get_building_blocks, get_building_blocks_info | |
from transformers import AutoModelForQuestionAnswering | |
class BaseDatasetMock(Dataset, ABC): | |
def __init__(self, input_size: Tuple, num_samples: int = 10): | |
super().__init__() | |
self._input_size = input_size | |
self._len = num_samples | |
@abstractmethod | |
def __getitem__(self, index: int) -> Tuple[torch.Tensor, torch.Tensor]: | |
pass | |
def __len__(self) -> int: | |
return self._len | |
class RandomDatasetMock(BaseDatasetMock): | |
def __getitem__(self, index): | |
return torch.rand(self._input_size), torch.zeros(1) | |
def create_any_mock_dataloader(dataset_cls: type, config: NNCFConfig, num_samples: int = 1, | |
batch_size: int = 1) -> DataLoader: | |
input_infos_list = create_input_infos(config) | |
input_sample_size = input_infos_list[0].shape | |
data_loader = DataLoader(dataset_cls(input_sample_size[1:], num_samples), | |
batch_size=batch_size, | |
num_workers=0, # Workaround | |
shuffle=False, drop_last=True) | |
return data_loader | |
create_random_mock_dataloader = functools.partial(create_any_mock_dataloader, dataset_cls=RandomDatasetMock) | |
bs=1 | |
nbatch = 3200 # dummy | |
seqlen = 384 | |
model_path = 'bert-base-uncased' | |
model = AutoModelForQuestionAnswering.from_pretrained(model_path) | |
nncf_cfgdict = dict( | |
input_info = [dict(sample_size=[bs, seqlen], type='long')]*3, | |
) | |
nncf_cfg = NNCFConfig.from_dict(nncf_cfgdict) | |
mock_dataloader = create_random_mock_dataloader(config=nncf_cfg, num_samples=nbatch*bs, batch_size=bs) | |
nncf_cfg = register_default_init_args(nncf_cfg, mock_dataloader) | |
nncf_ctrl, nncf_model = create_compressed_model(model, nncf_cfg) | |
blocks, _, group_dependent = get_building_blocks(nncf_model, allow_nested_blocks=False) | |
blocks_info = get_building_blocks_info(blocks, nncf_model) | |
DETAIL=False | |
g=nncf_model.get_graph() | |
for i, bbi in enumerate(blocks_info): | |
op_addresses_ids = list(map(lambda x: g.get_node_by_name(x.__str__()).node_id, bbi.op_addresses)) | |
print("\n- {} | {} ---".format(i, bbi.block_type)) | |
print("start : {}\nend : {}\nNodeIDs:{}\n|\n".format( | |
bbi.building_block.start_node_name, | |
bbi.building_block.end_node_name, | |
op_addresses_ids)) | |
if DETAIL is True: | |
for oi, opaddr in enumerate(bbi.op_addresses): | |
print("NodeID {:3}: {}".format(g.get_node_by_name(opaddr.__str__()).node_id, opaddr)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
from typing import Dict, Callable, Any, Union, List, Tuple | |
import torch | |
import torch.nn as nn | |
from torch.utils.data import DataLoader | |
from torch.utils.data import Dataset | |
from nncf.torch.nncf_network import NNCFNetwork | |
from nncf.torch.dynamic_graph.graph_tracer import create_input_infos, create_dummy_forward_fn | |
from nncf.torch import create_compressed_model | |
from nncf import NNCFConfig | |
from nncf.torch.initialization import register_default_init_args | |
from abc import ABC, abstractmethod | |
from nncf.experimental.torch.search_building_blocks.search_blocks import get_building_blocks, get_building_blocks_info | |
from transformers import AutoModelForImageClassification | |
class BaseDatasetMock(Dataset, ABC): | |
def __init__(self, input_size: Tuple, num_samples: int = 10): | |
super().__init__() | |
self._input_size = input_size | |
self._len = num_samples | |
@abstractmethod | |
def __getitem__(self, index: int) -> Tuple[torch.Tensor, torch.Tensor]: | |
pass | |
def __len__(self) -> int: | |
return self._len | |
class RandomDatasetMock(BaseDatasetMock): | |
def __getitem__(self, index): | |
return torch.rand(self._input_size), torch.zeros(1) | |
def create_any_mock_dataloader(dataset_cls: type, config: NNCFConfig, num_samples: int = 1, | |
batch_size: int = 1) -> DataLoader: | |
input_infos_list = create_input_infos(config) | |
input_sample_size = input_infos_list[0].shape | |
data_loader = DataLoader(dataset_cls(input_sample_size[1:], num_samples), | |
batch_size=batch_size, | |
num_workers=0, # Workaround | |
shuffle=False, drop_last=True) | |
return data_loader | |
create_random_mock_dataloader = functools.partial(create_any_mock_dataloader, dataset_cls=RandomDatasetMock) | |
bs=8 | |
nbatch = 3200 # dummy | |
model_path = 'google/vit-base-patch16-224' | |
model = AutoModelForImageClassification.from_pretrained(model_path) | |
nncf_cfgdict = dict( | |
input_info = dict(sample_size=[1, 3, 224, 224]), | |
) | |
nncf_cfg = NNCFConfig.from_dict(nncf_cfgdict) | |
mock_dataloader = create_random_mock_dataloader(config=nncf_cfg, num_samples=nbatch*bs, batch_size=bs) | |
nncf_cfg = register_default_init_args(nncf_cfg, mock_dataloader) | |
nncf_ctrl, nncf_model = create_compressed_model(model, nncf_cfg) | |
blocks, _, group_dependent = get_building_blocks(nncf_model, allow_nested_blocks=False) | |
blocks_info = get_building_blocks_info(blocks, nncf_model) | |
DETAIL=False | |
g=nncf_model.get_graph() | |
for i, bbi in enumerate(blocks_info): | |
op_addresses_ids = list(map(lambda x: g.get_node_by_name(x.__str__()).node_id, bbi.op_addresses)) | |
print("\n- {} | {} ---".format(i, bbi.block_type)) | |
print("start : {}\nend : {}\nNodeIDs:{}\n|\n".format( | |
bbi.building_block.start_node_name, | |
bbi.building_block.end_node_name, | |
op_addresses_ids)) | |
if DETAIL is True: | |
for oi, opaddr in enumerate(bbi.op_addresses): | |
print("NodeID {:3}: {}".format(g.get_node_by_name(opaddr.__str__()).node_id, opaddr)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
from typing import Dict, Callable, Any, Union, List, Tuple | |
import torch | |
import torch.nn as nn | |
from torch.utils.data import DataLoader | |
from torch.utils.data import Dataset | |
from nncf.torch.nncf_network import NNCFNetwork | |
from nncf.torch.dynamic_graph.graph_tracer import create_input_infos, create_dummy_forward_fn | |
from nncf.torch import create_compressed_model | |
from nncf import NNCFConfig | |
from nncf.torch.initialization import register_default_init_args | |
from abc import ABC, abstractmethod | |
from nncf.experimental.torch.search_building_blocks.search_blocks import get_building_blocks, get_building_blocks_info | |
from transformers import AutoModelForAudioClassification | |
class BaseDatasetMock(Dataset, ABC): | |
def __init__(self, input_size: Tuple, num_samples: int = 10): | |
super().__init__() | |
self._input_size = input_size | |
self._len = num_samples | |
@abstractmethod | |
def __getitem__(self, index: int) -> Tuple[torch.Tensor, torch.Tensor]: | |
pass | |
def __len__(self) -> int: | |
return self._len | |
class RandomDatasetMock(BaseDatasetMock): | |
def __getitem__(self, index): | |
return torch.rand(self._input_size), torch.zeros(1) | |
def create_any_mock_dataloader(dataset_cls: type, config: NNCFConfig, num_samples: int = 1, | |
batch_size: int = 1) -> DataLoader: | |
input_infos_list = create_input_infos(config) | |
input_sample_size = input_infos_list[0].shape | |
data_loader = DataLoader(dataset_cls(input_sample_size[1:], num_samples), | |
batch_size=batch_size, | |
num_workers=0, # Workaround | |
shuffle=False, drop_last=True) | |
return data_loader | |
create_random_mock_dataloader = functools.partial(create_any_mock_dataloader, dataset_cls=RandomDatasetMock) | |
bs=1 | |
n_sample=16000 | |
nbatch = 320000 # dummy | |
model_path = 'anton-l/wav2vec2-base-ft-keyword-spotting' | |
model = AutoModelForAudioClassification.from_pretrained(model_path) | |
nncf_cfgdict = dict( | |
input_info = dict(sample_size=[bs, n_sample]), | |
# compression = dict(algorithm="quantization") | |
) | |
nncf_cfg = NNCFConfig.from_dict(nncf_cfgdict) | |
mock_dataloader = create_random_mock_dataloader(config=nncf_cfg, num_samples=nbatch*bs, batch_size=bs) | |
nncf_cfg = register_default_init_args(nncf_cfg, mock_dataloader) | |
nncf_ctrl, nncf_model = create_compressed_model(model, nncf_cfg) | |
nncf_model.eval() | |
blocks, _, group_dependent = get_building_blocks(nncf_model, allow_nested_blocks=False) | |
blocks_info = get_building_blocks_info(blocks, nncf_model) | |
DETAIL=False | |
g=nncf_model.get_graph() | |
for i, bbi in enumerate(blocks_info): | |
op_addresses_ids = list(map(lambda x: g.get_node_by_name(x.__str__()).node_id, bbi.op_addresses)) | |
print("\n- {} | {} ---".format(i, bbi.block_type)) | |
print("start : {}\nend : {}\nNodeIDs:{}\n|\n".format( | |
bbi.building_block.start_node_name, | |
bbi.building_block.end_node_name, | |
op_addresses_ids)) | |
if DETAIL is True: | |
for oi, opaddr in enumerate(bbi.op_addresses): | |
print("NodeID {:3}: {}".format(g.get_node_by_name(opaddr.__str__()).node_id, opaddr)) |
Wrap mobilebert
import functools
from typing import Dict, Callable, Any, Union, List, Tuple
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from nncf.torch.nncf_network import NNCFNetwork
from nncf.torch.dynamic_graph.graph_tracer import create_input_infos, create_dummy_forward_fn
from nncf.torch import create_compressed_model
from nncf import NNCFConfig
from nncf.torch.initialization import register_default_init_args
from abc import ABC, abstractmethod
from nncf.experimental.torch.search_building_blocks.search_blocks import BuildingBlock, get_building_blocks, BuildingBlockType, BlockFilteringStrategy
from transformers import AutoModelForQuestionAnswering
class BaseDatasetMock(Dataset, ABC):
def __init__(self, input_size: Tuple, num_samples: int = 10):
super().__init__()
self._input_size = input_size
self._len = num_samples
@abstractmethod
def __getitem__(self, index: int) -> Tuple[torch.Tensor, torch.Tensor]:
pass
def __len__(self) -> int:
return self._len
class RandomDatasetMock(BaseDatasetMock):
def __getitem__(self, index):
return torch.rand(self._input_size), torch.zeros(1)
def create_any_mock_dataloader(dataset_cls: type, config: NNCFConfig, num_samples: int = 1,
batch_size: int = 1) -> DataLoader:
input_infos_list = create_input_infos(config)
input_sample_size = input_infos_list[0].shape
data_loader = DataLoader(dataset_cls(input_sample_size[1:], num_samples),
batch_size=batch_size,
num_workers=0, # Workaround
shuffle=False, drop_last=True)
return data_loader
create_random_mock_dataloader = functools.partial(create_any_mock_dataloader, dataset_cls=RandomDatasetMock)
bs=1
nbatch = 3200 # dummy
seqlen = 384
model_path = 'csarron/mobilebert-uncased-squad-v1'
model = AutoModelForQuestionAnswering.from_pretrained(model_path)
nncf_cfgdict = dict(
input_info = [dict(sample_size=[bs, seqlen], type='long')]*3,
)
nncf_cfg = NNCFConfig.from_dict(nncf_cfgdict)
mock_dataloader = create_random_mock_dataloader(config=nncf_cfg, num_samples=nbatch*bs, batch_size=bs)
nncf_cfg = register_default_init_args(nncf_cfg, mock_dataloader)
nncf_ctrl, nncf_model = create_compressed_model(model, nncf_cfg)
building_blocks, _ = get_building_blocks(nncf_model,
target_block_types=[BuildingBlockType.MSHA, BuildingBlockType.FF],
block_filter_strategy=BlockFilteringStrategy.KEEP_SMALL,
hw_fused_ops=True)
for idx, block in enumerate(building_blocks):
print("\n- Block {} | {} -".format(idx, block.block_type))
print("\t__start_node: {}".format(block.start_node_name))
print("\t____end_node: {}".format(block.end_node_name))
print("\top_addresses:")
for i, op in enumerate(block.op_addresses):
if 'NNCFLinear' in str(op):
print("\t\t({:2}, {:2}) {}".format(idx, i, op))
print("done")
New API, April 2023
import functools
from typing import Dict, Callable, Any, Union, List, Tuple
import os
from pathlib import Path
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from nncf.torch.nncf_network import NNCFNetwork
from nncf.torch.dynamic_graph.graph_tracer import create_input_infos, create_dummy_forward_fn
from nncf.torch import create_compressed_model
from nncf import NNCFConfig
from nncf.torch.initialization import register_default_init_args
from abc import ABC, abstractmethod
# from nncf.experimental.torch.search_building_blocks.search_blocks import get_building_blocks, get_building_blocks_info
from nncf.experimental.common.pruning.nodes_grouping import get_pruning_groups
from nncf.experimental.torch.pruning.operations import PT_EXPERIMENTAL_PRUNING_OPERATOR_METATYPES
from transformers import AutoModelForQuestionAnswering
class BaseDatasetMock(Dataset, ABC):
def __init__(self, input_size: Tuple, num_samples: int = 10):
super().__init__()
self._input_size = input_size
self._len = num_samples
@abstractmethod
def __getitem__(self, index: int) -> Tuple[torch.Tensor, torch.Tensor]:
pass
def __len__(self) -> int:
return self._len
class RandomDatasetMock(BaseDatasetMock):
def __getitem__(self, index):
return torch.rand(self._input_size), torch.zeros(1)
def create_any_mock_dataloader(dataset_cls: type, config: NNCFConfig, num_samples: int = 1,
batch_size: int = 1) -> DataLoader:
input_infos_list = create_input_infos(config)
input_sample_size = input_infos_list[0].shape
data_loader = DataLoader(dataset_cls(input_sample_size[1:], num_samples),
batch_size=batch_size,
num_workers=0, # Workaround
shuffle=False, drop_last=True)
return data_loader
create_random_mock_dataloader = functools.partial(create_any_mock_dataloader, dataset_cls=RandomDatasetMock)
bs=1
nbatch = 3200 # dummy
seqlen = 384
model_path = 'bert-base-uncased'
model = AutoModelForQuestionAnswering.from_pretrained(model_path)
nncf_cfgdict = dict(
input_info = [dict(sample_size=[bs, seqlen], type='long')]*3,
)
nncf_cfg = NNCFConfig.from_dict(nncf_cfgdict)
mock_dataloader = create_random_mock_dataloader(config=nncf_cfg, num_samples=nbatch*bs, batch_size=bs)
nncf_cfg = register_default_init_args(nncf_cfg, mock_dataloader)
nncf_ctrl, nncf_model = create_compressed_model(model, nncf_cfg)
pruning_producing_types = ['linear']
nncf_graph = nncf_model.get_original_graph()
pruning_groups = get_pruning_groups(nncf_model.get_graph(),
PT_EXPERIMENTAL_PRUNING_OPERATOR_METATYPES,
pruning_producing_types,
Path(os.getcwd()))
groups = []
for group_id, group in enumerate(pruning_groups):
ctxes = []
print(f"\nGroup {group_id}")
for block in group.dim_blocks:
nncf_node = nncf_graph.get_node_by_id(block.producer_id)
# module = nncf_model.nncf.get_containing_module(nncf_node.node_name)
print("\t- {} | {} ---".format(group_id, nncf_node))
print("end")
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Setup
Wrap Microsoft's Swin with NNCF + Block Tracing
Output (single layer of the QKV projection is not captured)