Skip to content

Instantly share code, notes, and snippets.

@salahelfarissi
Created October 1, 2024 12:48
Show Gist options
  • Save salahelfarissi/ea0fc67528bfc2bfdcdbc42139ef2431 to your computer and use it in GitHub Desktop.
Save salahelfarissi/ea0fc67528bfc2bfdcdbc42139ef2431 to your computer and use it in GitHub Desktop.
FME Python Caller for reporting on ingested data
"""Generate report"""
import math
import os
import subprocess
import warnings
from datetime import date, datetime
from enum import Enum
from typing import NamedTuple
import contextily as cx
import fme
import fmeobjects
import geopandas as gpd
import matplotlib
import matplotlib.font_manager as fm
import matplotlib.pyplot as plt
import pandas as pd
from babel.dates import format_date
from docx import Document, shared
from docx.enum.section import WD_ORIENT, WD_SECTION
from eomaps import Maps
from sqlalchemy import URL, MetaData, Table, and_, case, create_engine, func, select
from sqlalchemy.exc import SAWarning
from sqlalchemy.orm import sessionmaker
warnings.filterwarnings(
action="ignore",
category=SAWarning,
message=".*Did not recognize type 'geometry' of column.*",
)
warnings.filterwarnings(
action="ignore",
category=SAWarning,
message=".*SELECT statement has a cartesian product.*",
)
A_ASPECT_RATIO = 1 / math.sqrt(2)
ATTRIB_FONT_STYLE = "italic"
ATTRIB_FONT_WEIGHT = "bold"
BASE_Z_ORDER = 1
BORDER_COLOR = "#96B8BA"
BORDER_LINE_WIDTH = 3
BOTTOM_PATCH_COLOR = "#002C39"
CLIENT_PATCH_COLOR = "#96B8BA"
COLOR_VALUE = "#75a1d2"
CONCAVE_HULL_COLOR = "#f68e7260"
CONVEX_HULL_COLOR = "#75a1d260"
DPI = 300
FAMILY = "Myriad Pro"
FONT_DIRECTORY = "fonts"
FORMAT = "png"
GEOM_COLUMN = "geom"
LEGEND_EXT_PATCH_COLOR = "#338198"
LEGEND_MOTIF_PATCH_COLOR = "#002C39"
LOGO_PATCH_COLOR = "#5D858B"
LOGO_PATH = "img/egisLogo.png"
MAIN_COLOR = "#abc100"
MARKER_COLOR = "#062c3a"
NORTH_ARROW_PATH = "img/northArrow.png"
NORTH_PATCH_COLOR = "#338198"
NORTH_PATCH_EXT_COLOR = "#96B8BA"
WARNING_COLOR = "#f68e72"
PROJ_CRS = 3857
TITLE_PATCH_COLOR = "#002C39"
MAX_FEATURES = 200
matplotlib.use("Agg")
font_dirs = [FONT_DIRECTORY] # The path to the custom font file.
font_files = fm.findSystemFonts(fontpaths=font_dirs, fontext="ttf")
for font_file in font_files:
fm.fontManager.addfont(font_file)
class Extent(NamedTuple):
"""Extent of the mask."""
initial_aspect: float
limx_left: float
limx_right: float
limy_bottom: float
limy_top: float
centroidx: float
centroidy: float
class Bounds(NamedTuple):
"""Bounds"""
minx: float
miny: float
maxx: float
maxy: float
class PaperSize(Enum):
"""Paper size enum."""
A4 = "A4"
A3 = "A3"
A2 = "A2"
A1 = "A1"
A0 = "A0"
logger = fmeobjects.FMELogFile()
page_size = PaperSize.A3.value
main_basemap = cx.providers.CartoDB.PositronNoLabels
url_object = URL.create(
drivername="postgresql+psycopg",
username=fme.macroValues["DB_USER"],
password=fme.macroValues["DB_PASSWORD"],
host=fme.macroValues["DB_HOST"],
port=fme.macroValues["DB_PORT"],
database=fme.macroValues["DB_NAME"],
)
engine = create_engine(url=url_object, echo=False)
SessionLocal = sessionmaker(bind=engine, autoflush=False)
metadata = MetaData()
class FeatureProcessor(object):
"""Template Class Interface:
When using this class, make sure its name is set as the value of the 'Class to Process Features'
transformer parameter.
"""
def __init__(self):
"""Base constructor for class members."""
self.feature_types = {}
# ? Table column names mapping
self.column_names_mapping = {
"feature_type_name": "Nom de la donnée",
"created_at": "Date d'ajout",
"format_short_name": "Format",
"format_long_name": "Long Format",
"feature_type_srid": "CRS",
"geometry_type": "Type de géométrie",
"initial_count": "Nombre d'entités",
"final_count": "Nombre d'entités après ingestion",
}
# ? Directory where the reports will be saved
self.reports_dir = fme.macroValues["REPORTS_DIR"]
# ? Create reports directory if it does not exist
if not os.path.exists(self.reports_dir):
fmeobjects.FMELogFile().logMessageString(
f"▶▶▶▶▶▶▶ Creating directory {self.reports_dir}"
)
os.makedirs(self.reports_dir)
with SessionLocal() as session:
reporting_table = Table(
fme.macroValues["REPORT_TABLE_NAME"].lower(),
metadata,
autoload_with=session.get_bind(),
schema=fme.macroValues["SCHEMA_NAME"].lower(),
)
# ? Read reporting table into a gdf
self.reporting_gdf = gpd.read_postgis(
sql=select(
reporting_table.c.feature_type_name,
reporting_table.c.created_at,
reporting_table.c.format_short_name,
reporting_table.c.format_long_name,
reporting_table.c.geometry_type,
reporting_table.c.initial_count,
reporting_table.c.final_count,
reporting_table.c.feature_type_srid,
reporting_table.c.geom,
).order_by(reporting_table.c.feature_type_name),
con=engine,
geom_col=GEOM_COLUMN,
)
# ? Lower case feature_type_name
self.reporting_gdf["feature_type_name"] = self.reporting_gdf[
"feature_type_name"
].str.lower()
# ? Convert to CRS 3857
self.reporting_gdf.to_crs(epsg=PROJ_CRS, inplace=True)
# ? Add option to report only sample of the data
# ! Constrain user input it 0 < FRACTION <= 1
if fme.macroValues["SAMPLE_DATA"] == "YES":
self.reporting_gdf = self.reporting_gdf.sample(
frac=float(fme.macroValues["FRACTION"])
)
# ? Initialize the document
self.document = initialize_report(template_path="template.docx")
# ? Get respective styles
title_style, bullet_style_level_1 = get_styles(
self.document, ["title", "bullet_level_1"]
)
fmeobjects.FMELogFile().logMessageString("▶▶▶▶▶▶▶ Adding introductory section")
# ? Add title
self.document.add_paragraph("Qualité de la donnée", style=title_style)
# ? Add creation date and schema name
self.document.add_paragraph(
f"Date de création : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
style=bullet_style_level_1,
)
self.document.add_paragraph(
f"Nom du schéma : {fme.macroValues['SCHEMA_NAME']}",
style=bullet_style_level_1,
)
def has_support_for(self, support_type: int):
"""This method is called by FME to determine if the PythonCaller supports Bulk mode,
which allows for significant performance gains when processing large numbers of features.
Bulk mode cannot always be supported.
More information available in transformer help.
"""
return support_type == fmeobjects.FME_SUPPORT_FEATURE_TABLE_SHIM
def input(self, feature: fmeobjects.FMEFeature):
"""This method is called for each feature which enters the PythonCaller.
Processed input features can be emitted from this method using self.pyoutput().
If knowledge of all input features is required for processing, then input features should be
cached to a list instance variable and processed using group processing or in the close() method.
"""
self.pyoutput(feature)
def close(self):
"""This method is called once all the FME Features have been processed from input()."""
# ? Get table and bullet styles
bullet_stl_l1 = get_styles(self.document, ["bullet_level_1"])
# ? Total count of features
self.document.add_paragraph(
f"Nombre de données : {self.reporting_gdf.shape[0]}",
style=bullet_stl_l1,
)
# ? Change margins to accomodate the table
change_margins(self.document, shared.Cm(0.2))
# ? Add table
fmeobjects.FMELogFile().logMessageString("▶▶▶▶▶▶▶ Adding table")
add_table(self, self.document)
# ? Plot maps grid
if fme.macroValues["PLOT_MAPS_GRID"] == "YES":
change_margins(self.document, shared.Cm(2.0))
change_orientation(self.document, layout="landscape")
fmeobjects.FMELogFile().logMessageString("▶▶▶▶▶▶▶ Plotting maps grid")
maps_path = plot_maps_grid(
reporting_gdf=self.reporting_gdf, reports_dir=self.reports_dir
)
# Add the plot to the document
for map_path in maps_path:
self.document.add_picture(map_path)
# ? Align picture to center
picture_paragraph = self.document.paragraphs[-1]
picture_paragraph.alignment = 1
document_path = rf"{fme.macroValues['FME_MF_DIR']}\{self.reports_dir}\{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.docx"
self.document.save(path_or_stream=document_path)
# Open the document
subprocess.Popen([document_path], shell=True)
def plot_maps_grid(reporting_gdf: gpd.GeoDataFrame, reports_dir: str):
"""Plot maps grid"""
features_per_figure_first = 11
features_per_figure_other = 12
# ? If number of data is 11 or below, number of figures is 1
num_figures = (
math.ceil(
(reporting_gdf.shape[0] - features_per_figure_first)
/ features_per_figure_other
)
+ 1
)
# ? Initialize the map
figsize = define_map_size(page_size=page_size) # A3
# ? Maps paths to return
maps_paths = []
# ? Base text size
base_text_size = 14
# ? Y position for title and subtitle
y_title = 0.55
y_subtitle = 0.48
for fig_num in range(num_figures):
# ? For the first figure, include the intro card
if fig_num == 0:
# ? Shape gives number of rows and columns
features_in_current_fig = min(
features_per_figure_first, reporting_gdf.shape[0]
)
# ? Number of rows and columns (adding 1 for the intro card)
n = features_in_current_fig + 1 # 1 for the intro card
# ? Number of rows and columns
# ? Product of rows and columns should equal n = 12
cols = math.ceil(math.sqrt(n))
rows = math.ceil(n / cols)
# ? Main card
m = Maps(ax=(rows, cols, 1), crs=PROJ_CRS, figsize=figsize)
m.set_frame(ec="none")
# ? Title
add_text(
m=m,
x=0.25,
y=y_title,
text="Ingestion des données",
color=MARKER_COLOR,
text_size=base_text_size / math.sqrt(n) * 1.8,
ha="left",
)
# ? Date
d = date.today()
formatted_date = format_date(d, locale="fr_FR")
add_text(
m=m,
x=0.25,
y=y_subtitle,
text=f"Date : {formatted_date}",
color=MARKER_COLOR,
text_size=base_text_size / math.sqrt(n) * 1.2,
ha="left",
weight="normal",
)
# ? Now, add up to 11 maps in the first figure
start_index = fig_num * features_per_figure_first
end_index = start_index + features_in_current_fig
# ? Sort reporting_gdf so that Point comes first, then LineString, then Polygon
reporting_gdf["geometry_type"] = pd.Categorical(
reporting_gdf["geometry_type"],
categories=[
"Point",
"MultiPoint",
"PointZ",
"MultiPointZ",
"LineString",
"MultiLineString",
"LineStringZ",
"MultiLineStringZ",
"Polygon",
"MultiPolygon",
"PolygonZ",
"MultiPolygonZ",
],
ordered=True,
)
reporting_gdf = reporting_gdf.sort_values(by=["geometry_type"])
for index, feature_type_name in enumerate(
reporting_gdf["feature_type_name"][start_index:end_index], start=1
):
fmeobjects.FMELogFile().logMessageString(
f"▶▶▶▶▶ Plotting feature {feature_type_name}: {index}/{features_in_current_fig}, Sheet: {fig_num + 1}/{num_figures}"
)
# ? Mao where to plot
current_m = m.new_map(ax=(rows, cols, index + 1), crs=PROJ_CRS)
plot_gdf(
m=current_m,
reporting_gdf=reporting_gdf,
feature_type_name=feature_type_name,
n=n,
text_size=base_text_size / math.sqrt(n),
)
# ? Save maps to file
maps_grid_path = rf"{fme.macroValues['FME_MF_DIR']}\{reports_dir}\{fme.macroValues['SCHEMA_NAME']}_extent_{len(reporting_gdf['feature_type_name'])}_features_part_{fig_num + 1}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.{FORMAT}"
maps_paths.append(maps_grid_path)
plt.savefig(
fname=maps_grid_path,
dpi=DPI,
format=FORMAT,
pad_inches=0.1,
bbox_inches="tight",
orientation="landscape",
edgecolor="none",
facecolor=m.ax.get_facecolor(),
)
else:
# ? Add maps to the figure
start_index = (
features_per_figure_first + (fig_num - 1) * features_per_figure_other
)
features_in_current_fig = min(
features_per_figure_other, reporting_gdf.shape[0] - start_index
)
end_index = start_index + features_in_current_fig
# ? Number of rows and columns (no intro card this time)
n = features_in_current_fig
cols = math.ceil(math.sqrt(n))
rows = math.ceil(n / cols)
# ? Main card
m = Maps(ax=(rows, cols, 1), crs=PROJ_CRS, figsize=figsize)
m.set_frame(ec="none")
for index, feature_type_name in enumerate(
reporting_gdf["feature_type_name"][start_index:end_index], start=0
):
fmeobjects.FMELogFile().logMessageString(
f"▶▶▶▶▶ Plotting feature {feature_type_name}: {index + 1}/{features_in_current_fig}, Sheet: {fig_num + 1}/{num_figures}"
)
if index == 0:
current_m = m
else:
current_m = m.new_map(ax=(rows, cols, index + 1), crs=PROJ_CRS)
plot_gdf(
m=current_m,
reporting_gdf=reporting_gdf,
feature_type_name=feature_type_name,
n=n,
text_size=base_text_size / math.sqrt(n),
)
# ? Save maps to file
maps_grid_path = rf"{fme.macroValues['FME_MF_DIR']}\{reports_dir}\{fme.macroValues['SCHEMA_NAME']}_extent_{len(reporting_gdf['feature_type_name'])}_features_part_{fig_num + 1}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.{FORMAT}"
maps_paths.append(maps_grid_path)
plt.savefig(
fname=maps_grid_path,
dpi=DPI,
format=FORMAT,
pad_inches=0.1,
bbox_inches="tight",
orientation="landscape",
edgecolor="none",
facecolor=m.ax.get_facecolor(),
)
return maps_paths
def plot_gdf(
m: Maps,
reporting_gdf: gpd.GeoDataFrame,
feature_type_name: str,
n: int,
text_size: float,
):
"""Plot gdf"""
cols = math.ceil(math.sqrt(n))
rows = math.ceil(n / cols)
y_header = 1 + (cols + rows) / 80
y_value = 1 + (cols + rows) / 120
y_sub_header = -(cols + rows) / 450
y_sub_value = -(cols + rows) / 150
# ? Feature table
feature_table = Table(
feature_type_name,
metadata,
autoload_with=engine,
schema=fme.macroValues["SCHEMA_NAME"],
)
with SessionLocal() as session:
coords = session.execute(
select(
func.ST_X(func.ST_Centroid(feature_table.c.geom)).label("x"),
func.ST_Y(func.ST_Centroid(feature_table.c.geom)).label("y"),
).limit(1)
).one()
crs = session.execute(
select(func.ST_SRID(feature_table.c.geom)).limit(1)
).scalar()
if fme.macroValues["LIMIT_FEATURES"] == "YES":
raw_subq = (
select(
case(
(
and_(
func.ST_GeometryType(feature_table.c.geom).in_(
["ST_Polygon", "ST_MultiPolygon"]
),
int(fme.macroValues["MAX_FEATURES"]) > MAX_FEATURES,
),
func.ST_Centroid(
func.ST_Transform(feature_table.c.geom, PROJ_CRS),
),
),
else_=func.ST_Transform(feature_table.c.geom, PROJ_CRS),
).label("geom"),
)
.order_by(
func.ST_Distance(
feature_table.c.geom,
func.ST_SetSRID(func.ST_Point(coords[0], coords[1]), crs),
)
)
.limit(int(fme.macroValues["MAX_FEATURES"]))
.subquery()
)
raw_stmt = select(func.ST_Transform(raw_subq.c.geom, PROJ_CRS).label("geom"))
else:
raw_subq = select(
case(
(
func.ST_GeometryType(feature_table.c.geom).in_(
["ST_Polygon", "ST_MultiPolygon"]
),
func.ST_Centroid(
func.ST_Transform(feature_table.c.geom, PROJ_CRS),
),
),
else_=func.ST_Transform(feature_table.c.geom, PROJ_CRS),
).label("geom"),
)
# ? Read feature table into a gdf
feature_gdf = gpd.read_postgis(
sql=raw_stmt,
con=engine,
geom_col="geom",
)
# ? Remove frame
m.set_frame(ec="none")
# ? Current gdf
current_reporting_feature = reporting_gdf[
reporting_gdf["feature_type_name"] == feature_type_name
]
# ? Set extent
square_extent = get_square_extent(feature_gdf)
m.set_extent(extents=square_extent, crs=PROJ_CRS)
geom_type = feature_gdf.geom_type.unique().item()
if "Point" in geom_type:
m.add_gdf(feature_gdf, markersize=1, color=MARKER_COLOR)
elif "Line" in geom_type:
m.add_gdf(feature_gdf, lw=1, color=MARKER_COLOR)
else:
m.add_gdf(feature_gdf, fc=MARKER_COLOR, ec="w", lw=1)
# ? Add convex hull
if fme.macroValues["PLOT_CONVEX_HULL"] == "YES":
m.add_gdf(
gpd.GeoDataFrame(
geometry=[feature_gdf.union_all().convex_hull], crs=PROJ_CRS
),
fc=CONVEX_HULL_COLOR,
lw=1,
ec="none",
)
# ? Add concave hull
if fme.macroValues["PLOT_CONCAVE_HULL"] == "YES":
concave_stmt = select(
func.ST_Collect(raw_subq.c.geom).label("geom"),
)
concave_gdf = gpd.read_postgis(
sql=concave_stmt,
con=engine,
geom_col="geom",
)
try:
m.add_gdf(
concave_gdf.concave_hull(float(fme.macroValues["CONCAVE_RATIO"])),
fc=CONCAVE_HULL_COLOR,
lw=1,
ec="none",
)
except Exception as e:
logger.logMessageString(
f"▶▶▶▶▶▶▶ Error plotting concave hull for {feature_type_name}: {e}"
)
# ? Headers
header_1 = add_text(m, 0, y_header, "Donnée", MAIN_COLOR, text_size=text_size)
header_2 = add_text(
m,
0.5,
y_header,
"Nombre d'entités",
MAIN_COLOR,
ha="center",
text_size=text_size,
)
header_3 = add_text(
m,
1,
y_header,
"Type de géométrie",
MAIN_COLOR,
ha="right",
text_size=text_size,
)
# ? Lower headers
sub_header_1 = add_text(
m,
1,
y_sub_header,
"CRS",
MAIN_COLOR,
text_size=text_size,
ha="right",
)
# ? Values
value_1 = add_text(
m, 0, y_value, feature_type_name, COLOR_VALUE, text_size=text_size
)
# count of entities
# a new variable so that only if final count is different from initial count, the final count is shown
entities_count = current_reporting_feature["final_count"].unique().item()
if current_reporting_feature["initial_count"].unique().item() != entities_count:
entities_count = f"{current_reporting_feature['initial_count'].unique().item()} / {entities_count}"
value_2 = add_text(
m,
0.5,
y_value,
entities_count,
(
COLOR_VALUE
if current_reporting_feature["initial_count"].unique().item()
== current_reporting_feature["final_count"].unique().item()
else WARNING_COLOR
),
ha="center",
text_size=text_size,
)
value_3 = add_text(
m,
1,
y_value,
current_reporting_feature["geometry_type"].unique().tolist()[0],
COLOR_VALUE,
ha="right",
text_size=text_size,
)
# ? SRID
sub_value_1 = add_text(
m,
1,
y_sub_value,
current_reporting_feature["feature_type_srid"].unique().item(),
COLOR_VALUE,
text_size=text_size,
ha="right",
)
# ? No overlap
adjust_overlap(header_1, value_1, 0, y_header, type="header")
adjust_overlap(header_2, value_2, 0.5, y_header, type="header")
adjust_overlap(header_3, value_3, 1, y_header, type="header")
adjust_overlap(sub_header_1, sub_value_1, 1, y_sub_value, type="sub_header")
# ? Add basemap
if fme.macroValues["ADD_BASEMAP"] == "YES":
cx.add_basemap(
ax=m.ax,
source=main_basemap,
attribution=False,
)
# ? Add scalebar
m.add_scalebar(
auto_position=(0.01, 0.01),
rotation=90,
n=4,
preset="bw",
scale_props={"colors": ("w", MARKER_COLOR), "width": 1},
label_props={
"family": FAMILY,
"scale": 1.3,
"rotation": 90,
"offset": -0.4,
"color": MARKER_COLOR,
},
line_props={"ec": "none"},
)
def get_square_extent(feature_gdf: gpd.GeoDataFrame):
"""Get square extent."""
minx, miny, maxx, maxy = feature_gdf.total_bounds
# Calculate width and height of the current extent
width = maxx - minx
height = maxy - miny
# Determine the side length of the square extent
side_length = max(width, height)
# Calculate 10% of the side length
expansion = 0.1 * side_length
# Calculate new square extent with 10% expansion
if width > height:
delta_y = (side_length - height) / 2
new_minx = minx - expansion / 2
new_maxx = maxx + expansion / 2
new_miny = (miny - delta_y) - expansion / 2
new_maxy = (maxy + delta_y) + expansion / 2
else:
delta_x = (side_length - width) / 2
new_minx = (minx - delta_x) - expansion / 2
new_maxx = (maxx + delta_x) + expansion / 2
new_miny = miny - expansion / 2
new_maxy = maxy + expansion / 2
# The square extent is now defined by:
square_extent = (new_minx, new_maxx, new_miny, new_maxy)
return square_extent
def change_margins(doc, margin):
"""Change the margins of the document."""
doc.add_section(WD_SECTION.CONTINUOUS)
current_section = doc.sections[-1]
current_section.left_margin = margin
current_section.right_margin = margin
current_section.top_margin = margin
current_section.bottom_margin = margin
def add_text(m, x, y, text, color, text_size=10, weight="bold", ha="left"):
"""Add text to the map."""
return m.text(
x=x,
y=y,
s=text,
transform=m.ax.transAxes,
family=FAMILY,
size=text_size,
weight=weight,
c=color,
ha=ha,
va="top",
)
def add_table(self, document):
"""Add table"""
# ? Get table and bullet styles
table_stl = get_styles(document, ["table"])
# ? Remove from gdf geom column
data_layer = self.reporting_gdf.drop(["geom"], axis=1)
data_layer["created_at"] = data_layer["created_at"].dt.strftime("%Y-%m-%d %H:%M")
data_table = document.add_table(
rows=data_layer.shape[0] + 1,
cols=data_layer.shape[1],
style=table_stl,
)
for j, col_name in enumerate(data_layer.columns):
data_table.cell(0, j).text = self.column_names_mapping[col_name]
for i in range(data_layer.shape[0]):
for j, cell in enumerate(data_table.rows[i + 1].cells):
cell.text = str(data_layer.values[i, j])
def initialize_report(template_path: str):
"""Initialize a report from a template."""
doc = Document(template_path)
doc._body.clear_content() # pylint: disable=protected-access
return doc
def get_styles(doc: Document, styles_to_get: list = None):
"""Get styles from the document."""
all_styles = {
"title": doc.styles["Titre 0"],
"bullet_level_1": doc.styles["Bullet-1"],
"bullet_level_2": doc.styles["Bullet-2"],
"table": doc.styles["Grid Table 1 Light Accent 1"],
}
if styles_to_get is None:
# If no specific styles are requested, return all styles
return (
all_styles["title"],
all_styles["bullet_level_1"],
all_styles["bullet_level_2"],
all_styles["table"],
)
# Return only the requested styles
requested_styles = [all_styles[style] for style in styles_to_get]
return requested_styles if len(requested_styles) > 1 else requested_styles[0]
def initialize_map(extent: Extent, page_size: str):
"""Initialize map object."""
# ? Define map (figure) size
figsize = define_map_size(page_size=page_size)
# ? Create map object
m = Maps(
crs=PROJ_CRS,
layer="base",
figsize=figsize,
num=1,
frameon=False,
dpi=DPI,
)
# ? Border color and line width
m.ax.spines[:].set_color(BORDER_COLOR)
m.ax.spines[:].set_linewidth(BORDER_LINE_WIDTH)
# ? Set map extent
m.set_extent(
extents=(
extent.limx_left,
extent.limx_right,
extent.limy_bottom,
extent.limy_top,
),
crs=PROJ_CRS,
)
return m
def change_orientation(doc: Document, layout: str = "landscape"):
"""Change the orientation of the document to landscape."""
current_section = doc.sections[-1]
new_width, new_height = current_section.page_height, current_section.page_width
new_section = doc.add_section(WD_SECTION.NEW_PAGE)
new_section.orientation = WD_ORIENT.LANDSCAPE
new_section.page_width = new_width
new_section.page_height = new_height
current_section = doc.sections[-1]
margin = shared.Cm(0.0) if layout == "landscape" else shared.Cm(2.0)
current_section.left_margin = margin
current_section.right_margin = margin
current_section.top_margin = margin
current_section.bottom_margin = margin
# Set page size to A3
current_section.page_width = (
shared.Cm(42.0) if layout == "landscape" else shared.Cm(21.0)
)
current_section.page_height = shared.Cm(29.7)
def define_map_size(page_size: str) -> tuple[float, float]:
"""Define map size for landscape"""
mm = 1 / 25.4
return {
"A4": (297 * mm, 210 * mm),
"A3": (420 * mm, 297 * mm),
"A2": (594 * mm, 420 * mm),
"A1": (841 * mm, 594 * mm),
"A0": (1189 * mm, 841 * mm),
}[page_size]
def constrain_aspect_ratio(diffx: float, diffy: float) -> tuple[float, float]:
"""Constrain aspect ratio."""
aspect_ratio = diffy / diffx
if aspect_ratio > A_ASPECT_RATIO:
diffy_constrained = diffx * A_ASPECT_RATIO
if diffy_constrained < diffy:
diffy_constrained = diffy
diffx_constrained = diffy / A_ASPECT_RATIO
else:
diffx_constrained = diffy / A_ASPECT_RATIO
if diffx_constrained < diffx:
diffx_constrained = diffx
diffy_constrained = diffx * A_ASPECT_RATIO
return diffx_constrained, diffy_constrained
def adjust_overlap(header_text_obj, value_text_obj, x_value, y_value, type="header"):
"""Adjust overlap."""
adjustment_step = 0.005
sub_header_1_bbox = header_text_obj.get_window_extent()
sub_value_1_bbox = value_text_obj.get_window_extent()
while sub_header_1_bbox.overlaps(sub_value_1_bbox):
if type == "header":
y_value += adjustment_step
header_text_obj.set_position((x_value, y_value))
sub_header_1_bbox = header_text_obj.get_window_extent()
else:
y_value -= adjustment_step # Move the subtitle down
value_text_obj.set_position((x_value, y_value))
sub_value_1_bbox = value_text_obj.get_window_extent()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment