Created
July 4, 2021 04:27
-
-
Save belyak/758039f33d3906187c0104ddfa65eb16 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import itertools | |
import math | |
import os | |
import unittest | |
from typing import List, Optional | |
import yaml | |
def get_files_iter(): | |
return iter((d for d in os.listdir(".") if (d.startswith("docker-compose-") and d.endswith(".yaml")))) | |
class ConnectionPoint: | |
def __init__(self, x, y): | |
self.x = x | |
self.y = y | |
def __iter__(self): | |
return iter([self.x, self.y]) | |
def __repr__(self): | |
return f"{self.__class__.__name__}({self.x}, {self.y})" | |
def __add__(self, other): | |
return ConnectionPoint(self.x + other.x, self.y + other.y) | |
@classmethod | |
def distance(cls, p1:'ConnectionPoint', p2: 'ConnectionPoint'): | |
return math.sqrt((p1.x - p2.x)**2 + (p1.y - p2.y)**2) | |
class Component: | |
_extra = 8 | |
_default_nested_spaces = 0 | |
def __init__(self, _name, _ip): | |
self.name = _name | |
self._ip = _ip | |
self.lines = [self.name, self._ip, ""] | |
self._nested_spaces = self._default_nested_spaces | |
self.top_margin = 1 | |
self.bottom_margin = 1 | |
self.left_margin = 1 | |
self.right_margin = 1 | |
self.comp_height = len(self.lines) + 2 + self.top_margin + self.bottom_margin | |
max_content_len = max(len(line) for line in self.lines) | |
self.comp_weight = self.left_margin + self.right_margin + self._nested_spaces + max_content_len + (self._extra * 2) + 2 | |
def get_connection_points(self) -> List[ConnectionPoint]: | |
middle_y = self.comp_height // 2 | |
middle_x = self.comp_weight // 2 | |
points = [ | |
ConnectionPoint(self.left_margin, middle_y), | |
ConnectionPoint(self.comp_weight - self.right_margin - 1, middle_y), | |
ConnectionPoint(middle_x, self.top_margin), | |
ConnectionPoint(middle_x, self.comp_height - self.bottom_margin - 1,), | |
] | |
return points | |
@property | |
def nested_spaces(self): | |
return self._nested_spaces | |
@nested_spaces.setter | |
def nested_spaces(self, new_value): | |
self._nested_spaces = new_value | |
def add_line(self, content: str): | |
self.lines.append(content) | |
@property | |
def max_content_len(self): | |
return max(len(s) for s in self.lines) | |
@property | |
def top_down_line(self): | |
return f"{self.left_margin_cnt}{self.sp_line}+{'-' * (self.max_content_len + self._extra * 2)}+{self.right_margin_cnt}" | |
@property | |
def left_margin_cnt(self): | |
return ' ' * self.left_margin | |
@property | |
def right_margin_cnt(self): | |
return ' ' * self.right_margin | |
@property | |
def sp_line(self): | |
return str(' ' * self._nested_spaces) | |
def render_all_lines(self) -> str: | |
needed_len = self.max_content_len + (self._extra * 2) | |
res = [] | |
for line in self.lines: | |
item_tpl = '+{:^' + str(needed_len) + '}+' | |
item = item_tpl.format(line) | |
item = f"{self.left_margin_cnt}{self.sp_line}{item}" | |
res.append(item) | |
return os.linesep.join(res) | |
def rendered_lines(self) -> List[str]: | |
_ = self.top_down_line | |
needed_len = self.max_content_len + (self._extra * 2) | |
res = [] | |
for line in self.lines: | |
item_tpl = '+{:^' + str(needed_len) + '}+' | |
item = item_tpl.format(line) | |
item = f"{self.sp_line}{self.left_margin_cnt}{item}{self.right_margin_cnt}" | |
res.append(item) | |
top_margin_block = ['',] * self.top_margin | |
bottom_margin_block = ['',] * self.bottom_margin | |
return top_margin_block + [self.top_down_line] + res + [self.top_down_line] + bottom_margin_block | |
@property | |
def width(self): | |
if len(self.lines): | |
return max(len(line) for line in self.lines) + 2 | |
else: | |
return 0 | |
@property | |
def height(self): | |
return 2 + len(self.lines) if len(self.lines) != 0 else 0 | |
def __str__(self): | |
all_lines = self.render_all_lines() | |
_ = self.top_down_line | |
parts = [ | |
self.top_down_line, | |
all_lines, | |
self.top_down_line | |
] | |
return os.linesep.join(parts) | |
def print(self): | |
return print(self) | |
class AbstractServiceDrawer: | |
_default_nested_spaces = 5 | |
def __init__(self, components: Optional[List['Component']], **kwargs): | |
self.components = components or [] | |
self.components_map = {c.name: c for c in self.components} | |
def draw(self, *args, **kwargs): | |
raise NotImplementedError | |
class ListServiceDrawer(AbstractServiceDrawer): | |
def draw(self, *args, **kwargs) -> str: | |
component_reprs = [] | |
for c in self.components: | |
c.nested_spaces = self._default_nested_spaces | |
parts = [ | |
"" | |
"", | |
c.__str__(), | |
"", | |
] | |
component_repr = os.linesep.join(parts) | |
component_reprs.append(component_repr) | |
return os.linesep.join(component_reprs) | |
class ASCIICanvas: | |
def __init__(self, width, height, title=None, **kwargs): | |
self.width = width | |
self.height = height | |
self.lines = self._gen_lines() | |
self.title = title | |
def draw_at_pos(self, x, y, xs, ys, lines): | |
""" | |
Takes another rectangular object (as a list of equal length lines) | |
and prints it to own rectangular ASCII area (canvas) | |
""" | |
print(f"draw at pos x: {x} y: {y}, x scale: {xs}, y scale: {ys}, lines len: {len(lines)}") | |
for l_ix, line in enumerate(lines): | |
try: | |
cnv_y = l_ix + y * ys | |
cnv_x = x * xs | |
target_line = self.lines[cnv_y] | |
self.lines[cnv_y] = target_line[:cnv_x] + line + target_line[cnv_x + len(line):] | |
except Exception as e: | |
sss = e | |
def draw_symbol_at_pos(self, x, y, c): | |
self.lines[y] = self.lines[y][:x] + c + self.lines[y][x + 1:] | |
def __str__(self): | |
def lines_to_component(_lines): | |
top_bottom = ''.join(['+' + ('-' * self.width) + '+']) | |
_p_lines = [f"|{line}|" for line in _lines] | |
return [top_bottom] + _p_lines + [top_bottom] | |
dbg_info = [ | |
f"canvas width: {self.width}, lines count: {len(self.lines)}", | |
f"canvas height: {self.height} lines lens: [{', '.join([f'{len(ln)}' for ln in self.lines])}]", | |
f"canvas debug info end.", | |
"", | |
] | |
if self.title: | |
lines = ["--- *** ---", self.title, "--- ***** ---"] + lines_to_component(self.lines) | |
else: | |
lines = lines_to_component(self.lines) | |
dbg_end = ["--- end ---", " * --- *"] | |
return "\n".join(dbg_info + lines + dbg_end) | |
def _gen_lines(self) -> List[str]: | |
res = [] | |
for _ in range(self.height): | |
res.append(' ' * self.width) | |
return res | |
class LayoutServiceDrawer(AbstractServiceDrawer): | |
def __init__(self, components: Optional[List['Component']], | |
layout, | |
title=None): | |
super().__init__(components) | |
self.title = title | |
self.layout = layout | |
self.connections = layout.get('connections', []) | |
def draw(self, *args, **kwargs): | |
req_w, req_h, min_x, min_y, = self._calc_required_size() | |
print(f"min x: {min_x} min y: {min_y}") | |
xs, ys = self._calc_required_xy_scales() | |
canvas = ASCIICanvas(title=self.title, | |
width=req_w * xs, | |
height=req_h * ys) | |
for component in self.components: | |
_xc, _yc = self.get_component_corr_x_y(component, min_x, min_y) | |
print(f"component name: {component.name} _xc, _yc: ({_xc},{_yc})") | |
canvas.draw_at_pos(_xc, _yc, xs, ys, component.rendered_lines()) | |
for comp_a, _data, in self.connections.items(): | |
if comp_a == component.name: | |
for comp_b, _add_d, in _data.items(): | |
if comp_b not in [c.name for c in self.components]: | |
raise ValueError(f"Incorrect target component {comp_b}!") | |
def _process_points(pp: List[ConnectionPoint], comp: Component): | |
res = [] | |
for cp in pp: | |
res.append( | |
self.component_point_to_canvas(cp, comp) | |
) | |
return res | |
comp_a_cp = _process_points(self.components_map[comp_a].get_connection_points(), | |
comp=self.components_map[comp_a]) | |
comp_b_cp = _process_points(self.components_map[comp_b].get_connection_points(), | |
comp=self.components_map[comp_b]) | |
result = [] | |
for p1, p2, in itertools.product(comp_a_cp, comp_b_cp): | |
result.append((ConnectionPoint.distance(p1, p2), p1, p2,)) | |
best_entry = min(result, key=lambda t: t[0]) | |
_, p1, p2 = best_entry | |
canvas.draw_symbol_at_pos(p1.x, p1.y, 'Z') | |
canvas.draw_symbol_at_pos(p2.x, p2.y, 'z') | |
return str(canvas) | |
def component_point_to_canvas(self, point: ConnectionPoint, component: Component): | |
*_, min_x, min_y, = self._calc_required_size() | |
comp_x_pos, comp_y_pos = self.get_component_corr_x_y(component, min_x, min_y) | |
fx, fy = self.component_x_y_to_canvas(point.x, point.y, comp_x_pos, comp_y_pos) | |
return ConnectionPoint(fx, fy) | |
def component_x_y_to_canvas(self, comp_x, comp_y, comp_x_pos, comp_y_pos): | |
xs, ys = self._calc_required_xy_scales() | |
fin_x = comp_x + comp_x_pos * xs | |
fin_y = comp_y + comp_y_pos * ys | |
return fin_x, fin_y, | |
def get_component_pos_x_y(self, component: Component): | |
return self.layout['services'][component.name]['position'] | |
def get_component_corr_x_y(self, component: Component, min_x, min_y): | |
x, y = self.get_component_pos_x_y(component) | |
x_corr, y_corr = x - min_x, y - min_y | |
return x_corr, y_corr | |
def _calc_required_xy_scales(self): | |
comp_ws, comp_hs = [], [] | |
for comp in self.components: | |
comp_ws.append(comp.comp_weight) | |
comp_hs.append(comp.comp_height) | |
return max(comp_ws) + 1, max(comp_hs) + 1 | |
def _calc_required_size(self): | |
xs_vals = [] | |
ys_vals = [] | |
services = self.layout.get('services', {}) | |
for comp_name, comp_position, in services.items(): | |
try: | |
_px, _py = comp_position['position'] | |
ys_vals.append(_py) | |
xs_vals.append(_px) | |
except KeyError: | |
raise ValueError( | |
f"for the layout {self.__class__.__name__} " | |
f"each service has to have 'position key " | |
f"while {comp_name} hasn't." | |
) | |
(min_y, max_y, | |
min_x, max_x) = (min(ys_vals), max(ys_vals), | |
min(xs_vals), max(xs_vals)) | |
_w = max_x - min_x + 1 | |
_h = max_y - min_y + 1 | |
print(locals()) | |
return _w, _h, min_x, min_y, | |
def process_service_component(global_map, global_list, data, name): | |
try: | |
cp = data['services'][name] | |
except KeyError: | |
raise ValueError("Bad component without 'Services' key!") | |
_ip = cp['networks']['app_net']['ipv4_address'] | |
component = Component(_name=name, | |
_ip=_ip) | |
global_map[name] = component | |
global_list.append(component) | |
def process_docker_file(f, drawer_cls=ListServiceDrawer, **drawer_kwargs): | |
with open(f, 'r') as fd: | |
data = yaml.load(fd, Loader=yaml.FullLoader) | |
print() | |
print(f"docker-compose file: {f}") | |
global_map = {} | |
components = [] | |
for service in data.get('services', []): | |
process_service_component(global_map, components, data, service) | |
connections = [] | |
for srv_from, data, in data.get('connections', []): | |
component = global_map[srv_from] | |
for srv_to, s_data, in data.items(): | |
dest_component = global_map[srv_to] | |
connections.append([component, dest_component]) | |
if connections != []: | |
drawer_kwargs['connections'] = connections | |
d = drawer_cls(components=components, **drawer_kwargs) | |
print(d.draw()) | |
DC_LAYOUT_MAP = { | |
'docker-compose-kes.yaml': { | |
'services': { | |
'kes_api': { | |
'position': (-1, 0,) | |
}, | |
'kes': { | |
'position': (0, 0,) | |
}, | |
'ether_stub': { | |
'position': (0, 1,), | |
}, | |
}, | |
'connections': { | |
'kes_api': { | |
'kes': {'type': 'ZMQ_REQ'} | |
}, | |
'ether_stub': { | |
'kes': {'type': 'GRETAP'} | |
} | |
} | |
} | |
} | |
def generate_ascii_schemes(compose_files=None, limit=None): | |
compose_files = compose_files or list(get_files_iter()) | |
if limit: | |
compose_files = compose_files[limit:] | |
for file in compose_files: | |
kwargs = {} | |
try: | |
kwargs['layout'] = DC_LAYOUT_MAP[file] | |
kwargs['title'] = f"Composition for file '{file}':" | |
drawer_cls = LayoutServiceDrawer | |
except KeyError: | |
drawer_cls = ListServiceDrawer | |
process_docker_file(file, drawer_cls=drawer_cls, **kwargs) | |
if __name__ == '__main__': | |
generate_ascii_schemes() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment