Created
October 1, 2024 12:48
-
-
Save salahelfarissi/ea0fc67528bfc2bfdcdbc42139ef2431 to your computer and use it in GitHub Desktop.
FME Python Caller for reporting on ingested data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Generate report""" | |
import math | |
import os | |
import subprocess | |
import warnings | |
from datetime import date, datetime | |
from enum import Enum | |
from typing import NamedTuple | |
import contextily as cx | |
import fme | |
import fmeobjects | |
import geopandas as gpd | |
import matplotlib | |
import matplotlib.font_manager as fm | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
from babel.dates import format_date | |
from docx import Document, shared | |
from docx.enum.section import WD_ORIENT, WD_SECTION | |
from eomaps import Maps | |
from sqlalchemy import URL, MetaData, Table, and_, case, create_engine, func, select | |
from sqlalchemy.exc import SAWarning | |
from sqlalchemy.orm import sessionmaker | |
warnings.filterwarnings( | |
action="ignore", | |
category=SAWarning, | |
message=".*Did not recognize type 'geometry' of column.*", | |
) | |
warnings.filterwarnings( | |
action="ignore", | |
category=SAWarning, | |
message=".*SELECT statement has a cartesian product.*", | |
) | |
A_ASPECT_RATIO = 1 / math.sqrt(2) | |
ATTRIB_FONT_STYLE = "italic" | |
ATTRIB_FONT_WEIGHT = "bold" | |
BASE_Z_ORDER = 1 | |
BORDER_COLOR = "#96B8BA" | |
BORDER_LINE_WIDTH = 3 | |
BOTTOM_PATCH_COLOR = "#002C39" | |
CLIENT_PATCH_COLOR = "#96B8BA" | |
COLOR_VALUE = "#75a1d2" | |
CONCAVE_HULL_COLOR = "#f68e7260" | |
CONVEX_HULL_COLOR = "#75a1d260" | |
DPI = 300 | |
FAMILY = "Myriad Pro" | |
FONT_DIRECTORY = "fonts" | |
FORMAT = "png" | |
GEOM_COLUMN = "geom" | |
LEGEND_EXT_PATCH_COLOR = "#338198" | |
LEGEND_MOTIF_PATCH_COLOR = "#002C39" | |
LOGO_PATCH_COLOR = "#5D858B" | |
LOGO_PATH = "img/egisLogo.png" | |
MAIN_COLOR = "#abc100" | |
MARKER_COLOR = "#062c3a" | |
NORTH_ARROW_PATH = "img/northArrow.png" | |
NORTH_PATCH_COLOR = "#338198" | |
NORTH_PATCH_EXT_COLOR = "#96B8BA" | |
WARNING_COLOR = "#f68e72" | |
PROJ_CRS = 3857 | |
TITLE_PATCH_COLOR = "#002C39" | |
MAX_FEATURES = 200 | |
matplotlib.use("Agg") | |
font_dirs = [FONT_DIRECTORY] # The path to the custom font file. | |
font_files = fm.findSystemFonts(fontpaths=font_dirs, fontext="ttf") | |
for font_file in font_files: | |
fm.fontManager.addfont(font_file) | |
class Extent(NamedTuple): | |
"""Extent of the mask.""" | |
initial_aspect: float | |
limx_left: float | |
limx_right: float | |
limy_bottom: float | |
limy_top: float | |
centroidx: float | |
centroidy: float | |
class Bounds(NamedTuple): | |
"""Bounds""" | |
minx: float | |
miny: float | |
maxx: float | |
maxy: float | |
class PaperSize(Enum): | |
"""Paper size enum.""" | |
A4 = "A4" | |
A3 = "A3" | |
A2 = "A2" | |
A1 = "A1" | |
A0 = "A0" | |
logger = fmeobjects.FMELogFile() | |
page_size = PaperSize.A3.value | |
main_basemap = cx.providers.CartoDB.PositronNoLabels | |
url_object = URL.create( | |
drivername="postgresql+psycopg", | |
username=fme.macroValues["DB_USER"], | |
password=fme.macroValues["DB_PASSWORD"], | |
host=fme.macroValues["DB_HOST"], | |
port=fme.macroValues["DB_PORT"], | |
database=fme.macroValues["DB_NAME"], | |
) | |
engine = create_engine(url=url_object, echo=False) | |
SessionLocal = sessionmaker(bind=engine, autoflush=False) | |
metadata = MetaData() | |
class FeatureProcessor(object): | |
"""Template Class Interface: | |
When using this class, make sure its name is set as the value of the 'Class to Process Features' | |
transformer parameter. | |
""" | |
def __init__(self): | |
"""Base constructor for class members.""" | |
self.feature_types = {} | |
# ? Table column names mapping | |
self.column_names_mapping = { | |
"feature_type_name": "Nom de la donnée", | |
"created_at": "Date d'ajout", | |
"format_short_name": "Format", | |
"format_long_name": "Long Format", | |
"feature_type_srid": "CRS", | |
"geometry_type": "Type de géométrie", | |
"initial_count": "Nombre d'entités", | |
"final_count": "Nombre d'entités après ingestion", | |
} | |
# ? Directory where the reports will be saved | |
self.reports_dir = fme.macroValues["REPORTS_DIR"] | |
# ? Create reports directory if it does not exist | |
if not os.path.exists(self.reports_dir): | |
fmeobjects.FMELogFile().logMessageString( | |
f"▶▶▶▶▶▶▶ Creating directory {self.reports_dir}" | |
) | |
os.makedirs(self.reports_dir) | |
with SessionLocal() as session: | |
reporting_table = Table( | |
fme.macroValues["REPORT_TABLE_NAME"].lower(), | |
metadata, | |
autoload_with=session.get_bind(), | |
schema=fme.macroValues["SCHEMA_NAME"].lower(), | |
) | |
# ? Read reporting table into a gdf | |
self.reporting_gdf = gpd.read_postgis( | |
sql=select( | |
reporting_table.c.feature_type_name, | |
reporting_table.c.created_at, | |
reporting_table.c.format_short_name, | |
reporting_table.c.format_long_name, | |
reporting_table.c.geometry_type, | |
reporting_table.c.initial_count, | |
reporting_table.c.final_count, | |
reporting_table.c.feature_type_srid, | |
reporting_table.c.geom, | |
).order_by(reporting_table.c.feature_type_name), | |
con=engine, | |
geom_col=GEOM_COLUMN, | |
) | |
# ? Lower case feature_type_name | |
self.reporting_gdf["feature_type_name"] = self.reporting_gdf[ | |
"feature_type_name" | |
].str.lower() | |
# ? Convert to CRS 3857 | |
self.reporting_gdf.to_crs(epsg=PROJ_CRS, inplace=True) | |
# ? Add option to report only sample of the data | |
# ! Constrain user input it 0 < FRACTION <= 1 | |
if fme.macroValues["SAMPLE_DATA"] == "YES": | |
self.reporting_gdf = self.reporting_gdf.sample( | |
frac=float(fme.macroValues["FRACTION"]) | |
) | |
# ? Initialize the document | |
self.document = initialize_report(template_path="template.docx") | |
# ? Get respective styles | |
title_style, bullet_style_level_1 = get_styles( | |
self.document, ["title", "bullet_level_1"] | |
) | |
fmeobjects.FMELogFile().logMessageString("▶▶▶▶▶▶▶ Adding introductory section") | |
# ? Add title | |
self.document.add_paragraph("Qualité de la donnée", style=title_style) | |
# ? Add creation date and schema name | |
self.document.add_paragraph( | |
f"Date de création : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", | |
style=bullet_style_level_1, | |
) | |
self.document.add_paragraph( | |
f"Nom du schéma : {fme.macroValues['SCHEMA_NAME']}", | |
style=bullet_style_level_1, | |
) | |
def has_support_for(self, support_type: int): | |
"""This method is called by FME to determine if the PythonCaller supports Bulk mode, | |
which allows for significant performance gains when processing large numbers of features. | |
Bulk mode cannot always be supported. | |
More information available in transformer help. | |
""" | |
return support_type == fmeobjects.FME_SUPPORT_FEATURE_TABLE_SHIM | |
def input(self, feature: fmeobjects.FMEFeature): | |
"""This method is called for each feature which enters the PythonCaller. | |
Processed input features can be emitted from this method using self.pyoutput(). | |
If knowledge of all input features is required for processing, then input features should be | |
cached to a list instance variable and processed using group processing or in the close() method. | |
""" | |
self.pyoutput(feature) | |
def close(self): | |
"""This method is called once all the FME Features have been processed from input().""" | |
# ? Get table and bullet styles | |
bullet_stl_l1 = get_styles(self.document, ["bullet_level_1"]) | |
# ? Total count of features | |
self.document.add_paragraph( | |
f"Nombre de données : {self.reporting_gdf.shape[0]}", | |
style=bullet_stl_l1, | |
) | |
# ? Change margins to accomodate the table | |
change_margins(self.document, shared.Cm(0.2)) | |
# ? Add table | |
fmeobjects.FMELogFile().logMessageString("▶▶▶▶▶▶▶ Adding table") | |
add_table(self, self.document) | |
# ? Plot maps grid | |
if fme.macroValues["PLOT_MAPS_GRID"] == "YES": | |
change_margins(self.document, shared.Cm(2.0)) | |
change_orientation(self.document, layout="landscape") | |
fmeobjects.FMELogFile().logMessageString("▶▶▶▶▶▶▶ Plotting maps grid") | |
maps_path = plot_maps_grid( | |
reporting_gdf=self.reporting_gdf, reports_dir=self.reports_dir | |
) | |
# Add the plot to the document | |
for map_path in maps_path: | |
self.document.add_picture(map_path) | |
# ? Align picture to center | |
picture_paragraph = self.document.paragraphs[-1] | |
picture_paragraph.alignment = 1 | |
document_path = rf"{fme.macroValues['FME_MF_DIR']}\{self.reports_dir}\{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.docx" | |
self.document.save(path_or_stream=document_path) | |
# Open the document | |
subprocess.Popen([document_path], shell=True) | |
def plot_maps_grid(reporting_gdf: gpd.GeoDataFrame, reports_dir: str): | |
"""Plot maps grid""" | |
features_per_figure_first = 11 | |
features_per_figure_other = 12 | |
# ? If number of data is 11 or below, number of figures is 1 | |
num_figures = ( | |
math.ceil( | |
(reporting_gdf.shape[0] - features_per_figure_first) | |
/ features_per_figure_other | |
) | |
+ 1 | |
) | |
# ? Initialize the map | |
figsize = define_map_size(page_size=page_size) # A3 | |
# ? Maps paths to return | |
maps_paths = [] | |
# ? Base text size | |
base_text_size = 14 | |
# ? Y position for title and subtitle | |
y_title = 0.55 | |
y_subtitle = 0.48 | |
for fig_num in range(num_figures): | |
# ? For the first figure, include the intro card | |
if fig_num == 0: | |
# ? Shape gives number of rows and columns | |
features_in_current_fig = min( | |
features_per_figure_first, reporting_gdf.shape[0] | |
) | |
# ? Number of rows and columns (adding 1 for the intro card) | |
n = features_in_current_fig + 1 # 1 for the intro card | |
# ? Number of rows and columns | |
# ? Product of rows and columns should equal n = 12 | |
cols = math.ceil(math.sqrt(n)) | |
rows = math.ceil(n / cols) | |
# ? Main card | |
m = Maps(ax=(rows, cols, 1), crs=PROJ_CRS, figsize=figsize) | |
m.set_frame(ec="none") | |
# ? Title | |
add_text( | |
m=m, | |
x=0.25, | |
y=y_title, | |
text="Ingestion des données", | |
color=MARKER_COLOR, | |
text_size=base_text_size / math.sqrt(n) * 1.8, | |
ha="left", | |
) | |
# ? Date | |
d = date.today() | |
formatted_date = format_date(d, locale="fr_FR") | |
add_text( | |
m=m, | |
x=0.25, | |
y=y_subtitle, | |
text=f"Date : {formatted_date}", | |
color=MARKER_COLOR, | |
text_size=base_text_size / math.sqrt(n) * 1.2, | |
ha="left", | |
weight="normal", | |
) | |
# ? Now, add up to 11 maps in the first figure | |
start_index = fig_num * features_per_figure_first | |
end_index = start_index + features_in_current_fig | |
# ? Sort reporting_gdf so that Point comes first, then LineString, then Polygon | |
reporting_gdf["geometry_type"] = pd.Categorical( | |
reporting_gdf["geometry_type"], | |
categories=[ | |
"Point", | |
"MultiPoint", | |
"PointZ", | |
"MultiPointZ", | |
"LineString", | |
"MultiLineString", | |
"LineStringZ", | |
"MultiLineStringZ", | |
"Polygon", | |
"MultiPolygon", | |
"PolygonZ", | |
"MultiPolygonZ", | |
], | |
ordered=True, | |
) | |
reporting_gdf = reporting_gdf.sort_values(by=["geometry_type"]) | |
for index, feature_type_name in enumerate( | |
reporting_gdf["feature_type_name"][start_index:end_index], start=1 | |
): | |
fmeobjects.FMELogFile().logMessageString( | |
f"▶▶▶▶▶ Plotting feature {feature_type_name}: {index}/{features_in_current_fig}, Sheet: {fig_num + 1}/{num_figures}" | |
) | |
# ? Mao where to plot | |
current_m = m.new_map(ax=(rows, cols, index + 1), crs=PROJ_CRS) | |
plot_gdf( | |
m=current_m, | |
reporting_gdf=reporting_gdf, | |
feature_type_name=feature_type_name, | |
n=n, | |
text_size=base_text_size / math.sqrt(n), | |
) | |
# ? Save maps to file | |
maps_grid_path = rf"{fme.macroValues['FME_MF_DIR']}\{reports_dir}\{fme.macroValues['SCHEMA_NAME']}_extent_{len(reporting_gdf['feature_type_name'])}_features_part_{fig_num + 1}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.{FORMAT}" | |
maps_paths.append(maps_grid_path) | |
plt.savefig( | |
fname=maps_grid_path, | |
dpi=DPI, | |
format=FORMAT, | |
pad_inches=0.1, | |
bbox_inches="tight", | |
orientation="landscape", | |
edgecolor="none", | |
facecolor=m.ax.get_facecolor(), | |
) | |
else: | |
# ? Add maps to the figure | |
start_index = ( | |
features_per_figure_first + (fig_num - 1) * features_per_figure_other | |
) | |
features_in_current_fig = min( | |
features_per_figure_other, reporting_gdf.shape[0] - start_index | |
) | |
end_index = start_index + features_in_current_fig | |
# ? Number of rows and columns (no intro card this time) | |
n = features_in_current_fig | |
cols = math.ceil(math.sqrt(n)) | |
rows = math.ceil(n / cols) | |
# ? Main card | |
m = Maps(ax=(rows, cols, 1), crs=PROJ_CRS, figsize=figsize) | |
m.set_frame(ec="none") | |
for index, feature_type_name in enumerate( | |
reporting_gdf["feature_type_name"][start_index:end_index], start=0 | |
): | |
fmeobjects.FMELogFile().logMessageString( | |
f"▶▶▶▶▶ Plotting feature {feature_type_name}: {index + 1}/{features_in_current_fig}, Sheet: {fig_num + 1}/{num_figures}" | |
) | |
if index == 0: | |
current_m = m | |
else: | |
current_m = m.new_map(ax=(rows, cols, index + 1), crs=PROJ_CRS) | |
plot_gdf( | |
m=current_m, | |
reporting_gdf=reporting_gdf, | |
feature_type_name=feature_type_name, | |
n=n, | |
text_size=base_text_size / math.sqrt(n), | |
) | |
# ? Save maps to file | |
maps_grid_path = rf"{fme.macroValues['FME_MF_DIR']}\{reports_dir}\{fme.macroValues['SCHEMA_NAME']}_extent_{len(reporting_gdf['feature_type_name'])}_features_part_{fig_num + 1}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.{FORMAT}" | |
maps_paths.append(maps_grid_path) | |
plt.savefig( | |
fname=maps_grid_path, | |
dpi=DPI, | |
format=FORMAT, | |
pad_inches=0.1, | |
bbox_inches="tight", | |
orientation="landscape", | |
edgecolor="none", | |
facecolor=m.ax.get_facecolor(), | |
) | |
return maps_paths | |
def plot_gdf( | |
m: Maps, | |
reporting_gdf: gpd.GeoDataFrame, | |
feature_type_name: str, | |
n: int, | |
text_size: float, | |
): | |
"""Plot gdf""" | |
cols = math.ceil(math.sqrt(n)) | |
rows = math.ceil(n / cols) | |
y_header = 1 + (cols + rows) / 80 | |
y_value = 1 + (cols + rows) / 120 | |
y_sub_header = -(cols + rows) / 450 | |
y_sub_value = -(cols + rows) / 150 | |
# ? Feature table | |
feature_table = Table( | |
feature_type_name, | |
metadata, | |
autoload_with=engine, | |
schema=fme.macroValues["SCHEMA_NAME"], | |
) | |
with SessionLocal() as session: | |
coords = session.execute( | |
select( | |
func.ST_X(func.ST_Centroid(feature_table.c.geom)).label("x"), | |
func.ST_Y(func.ST_Centroid(feature_table.c.geom)).label("y"), | |
).limit(1) | |
).one() | |
crs = session.execute( | |
select(func.ST_SRID(feature_table.c.geom)).limit(1) | |
).scalar() | |
if fme.macroValues["LIMIT_FEATURES"] == "YES": | |
raw_subq = ( | |
select( | |
case( | |
( | |
and_( | |
func.ST_GeometryType(feature_table.c.geom).in_( | |
["ST_Polygon", "ST_MultiPolygon"] | |
), | |
int(fme.macroValues["MAX_FEATURES"]) > MAX_FEATURES, | |
), | |
func.ST_Centroid( | |
func.ST_Transform(feature_table.c.geom, PROJ_CRS), | |
), | |
), | |
else_=func.ST_Transform(feature_table.c.geom, PROJ_CRS), | |
).label("geom"), | |
) | |
.order_by( | |
func.ST_Distance( | |
feature_table.c.geom, | |
func.ST_SetSRID(func.ST_Point(coords[0], coords[1]), crs), | |
) | |
) | |
.limit(int(fme.macroValues["MAX_FEATURES"])) | |
.subquery() | |
) | |
raw_stmt = select(func.ST_Transform(raw_subq.c.geom, PROJ_CRS).label("geom")) | |
else: | |
raw_subq = select( | |
case( | |
( | |
func.ST_GeometryType(feature_table.c.geom).in_( | |
["ST_Polygon", "ST_MultiPolygon"] | |
), | |
func.ST_Centroid( | |
func.ST_Transform(feature_table.c.geom, PROJ_CRS), | |
), | |
), | |
else_=func.ST_Transform(feature_table.c.geom, PROJ_CRS), | |
).label("geom"), | |
) | |
# ? Read feature table into a gdf | |
feature_gdf = gpd.read_postgis( | |
sql=raw_stmt, | |
con=engine, | |
geom_col="geom", | |
) | |
# ? Remove frame | |
m.set_frame(ec="none") | |
# ? Current gdf | |
current_reporting_feature = reporting_gdf[ | |
reporting_gdf["feature_type_name"] == feature_type_name | |
] | |
# ? Set extent | |
square_extent = get_square_extent(feature_gdf) | |
m.set_extent(extents=square_extent, crs=PROJ_CRS) | |
geom_type = feature_gdf.geom_type.unique().item() | |
if "Point" in geom_type: | |
m.add_gdf(feature_gdf, markersize=1, color=MARKER_COLOR) | |
elif "Line" in geom_type: | |
m.add_gdf(feature_gdf, lw=1, color=MARKER_COLOR) | |
else: | |
m.add_gdf(feature_gdf, fc=MARKER_COLOR, ec="w", lw=1) | |
# ? Add convex hull | |
if fme.macroValues["PLOT_CONVEX_HULL"] == "YES": | |
m.add_gdf( | |
gpd.GeoDataFrame( | |
geometry=[feature_gdf.union_all().convex_hull], crs=PROJ_CRS | |
), | |
fc=CONVEX_HULL_COLOR, | |
lw=1, | |
ec="none", | |
) | |
# ? Add concave hull | |
if fme.macroValues["PLOT_CONCAVE_HULL"] == "YES": | |
concave_stmt = select( | |
func.ST_Collect(raw_subq.c.geom).label("geom"), | |
) | |
concave_gdf = gpd.read_postgis( | |
sql=concave_stmt, | |
con=engine, | |
geom_col="geom", | |
) | |
try: | |
m.add_gdf( | |
concave_gdf.concave_hull(float(fme.macroValues["CONCAVE_RATIO"])), | |
fc=CONCAVE_HULL_COLOR, | |
lw=1, | |
ec="none", | |
) | |
except Exception as e: | |
logger.logMessageString( | |
f"▶▶▶▶▶▶▶ Error plotting concave hull for {feature_type_name}: {e}" | |
) | |
# ? Headers | |
header_1 = add_text(m, 0, y_header, "Donnée", MAIN_COLOR, text_size=text_size) | |
header_2 = add_text( | |
m, | |
0.5, | |
y_header, | |
"Nombre d'entités", | |
MAIN_COLOR, | |
ha="center", | |
text_size=text_size, | |
) | |
header_3 = add_text( | |
m, | |
1, | |
y_header, | |
"Type de géométrie", | |
MAIN_COLOR, | |
ha="right", | |
text_size=text_size, | |
) | |
# ? Lower headers | |
sub_header_1 = add_text( | |
m, | |
1, | |
y_sub_header, | |
"CRS", | |
MAIN_COLOR, | |
text_size=text_size, | |
ha="right", | |
) | |
# ? Values | |
value_1 = add_text( | |
m, 0, y_value, feature_type_name, COLOR_VALUE, text_size=text_size | |
) | |
# count of entities | |
# a new variable so that only if final count is different from initial count, the final count is shown | |
entities_count = current_reporting_feature["final_count"].unique().item() | |
if current_reporting_feature["initial_count"].unique().item() != entities_count: | |
entities_count = f"{current_reporting_feature['initial_count'].unique().item()} / {entities_count}" | |
value_2 = add_text( | |
m, | |
0.5, | |
y_value, | |
entities_count, | |
( | |
COLOR_VALUE | |
if current_reporting_feature["initial_count"].unique().item() | |
== current_reporting_feature["final_count"].unique().item() | |
else WARNING_COLOR | |
), | |
ha="center", | |
text_size=text_size, | |
) | |
value_3 = add_text( | |
m, | |
1, | |
y_value, | |
current_reporting_feature["geometry_type"].unique().tolist()[0], | |
COLOR_VALUE, | |
ha="right", | |
text_size=text_size, | |
) | |
# ? SRID | |
sub_value_1 = add_text( | |
m, | |
1, | |
y_sub_value, | |
current_reporting_feature["feature_type_srid"].unique().item(), | |
COLOR_VALUE, | |
text_size=text_size, | |
ha="right", | |
) | |
# ? No overlap | |
adjust_overlap(header_1, value_1, 0, y_header, type="header") | |
adjust_overlap(header_2, value_2, 0.5, y_header, type="header") | |
adjust_overlap(header_3, value_3, 1, y_header, type="header") | |
adjust_overlap(sub_header_1, sub_value_1, 1, y_sub_value, type="sub_header") | |
# ? Add basemap | |
if fme.macroValues["ADD_BASEMAP"] == "YES": | |
cx.add_basemap( | |
ax=m.ax, | |
source=main_basemap, | |
attribution=False, | |
) | |
# ? Add scalebar | |
m.add_scalebar( | |
auto_position=(0.01, 0.01), | |
rotation=90, | |
n=4, | |
preset="bw", | |
scale_props={"colors": ("w", MARKER_COLOR), "width": 1}, | |
label_props={ | |
"family": FAMILY, | |
"scale": 1.3, | |
"rotation": 90, | |
"offset": -0.4, | |
"color": MARKER_COLOR, | |
}, | |
line_props={"ec": "none"}, | |
) | |
def get_square_extent(feature_gdf: gpd.GeoDataFrame): | |
"""Get square extent.""" | |
minx, miny, maxx, maxy = feature_gdf.total_bounds | |
# Calculate width and height of the current extent | |
width = maxx - minx | |
height = maxy - miny | |
# Determine the side length of the square extent | |
side_length = max(width, height) | |
# Calculate 10% of the side length | |
expansion = 0.1 * side_length | |
# Calculate new square extent with 10% expansion | |
if width > height: | |
delta_y = (side_length - height) / 2 | |
new_minx = minx - expansion / 2 | |
new_maxx = maxx + expansion / 2 | |
new_miny = (miny - delta_y) - expansion / 2 | |
new_maxy = (maxy + delta_y) + expansion / 2 | |
else: | |
delta_x = (side_length - width) / 2 | |
new_minx = (minx - delta_x) - expansion / 2 | |
new_maxx = (maxx + delta_x) + expansion / 2 | |
new_miny = miny - expansion / 2 | |
new_maxy = maxy + expansion / 2 | |
# The square extent is now defined by: | |
square_extent = (new_minx, new_maxx, new_miny, new_maxy) | |
return square_extent | |
def change_margins(doc, margin): | |
"""Change the margins of the document.""" | |
doc.add_section(WD_SECTION.CONTINUOUS) | |
current_section = doc.sections[-1] | |
current_section.left_margin = margin | |
current_section.right_margin = margin | |
current_section.top_margin = margin | |
current_section.bottom_margin = margin | |
def add_text(m, x, y, text, color, text_size=10, weight="bold", ha="left"): | |
"""Add text to the map.""" | |
return m.text( | |
x=x, | |
y=y, | |
s=text, | |
transform=m.ax.transAxes, | |
family=FAMILY, | |
size=text_size, | |
weight=weight, | |
c=color, | |
ha=ha, | |
va="top", | |
) | |
def add_table(self, document): | |
"""Add table""" | |
# ? Get table and bullet styles | |
table_stl = get_styles(document, ["table"]) | |
# ? Remove from gdf geom column | |
data_layer = self.reporting_gdf.drop(["geom"], axis=1) | |
data_layer["created_at"] = data_layer["created_at"].dt.strftime("%Y-%m-%d %H:%M") | |
data_table = document.add_table( | |
rows=data_layer.shape[0] + 1, | |
cols=data_layer.shape[1], | |
style=table_stl, | |
) | |
for j, col_name in enumerate(data_layer.columns): | |
data_table.cell(0, j).text = self.column_names_mapping[col_name] | |
for i in range(data_layer.shape[0]): | |
for j, cell in enumerate(data_table.rows[i + 1].cells): | |
cell.text = str(data_layer.values[i, j]) | |
def initialize_report(template_path: str): | |
"""Initialize a report from a template.""" | |
doc = Document(template_path) | |
doc._body.clear_content() # pylint: disable=protected-access | |
return doc | |
def get_styles(doc: Document, styles_to_get: list = None): | |
"""Get styles from the document.""" | |
all_styles = { | |
"title": doc.styles["Titre 0"], | |
"bullet_level_1": doc.styles["Bullet-1"], | |
"bullet_level_2": doc.styles["Bullet-2"], | |
"table": doc.styles["Grid Table 1 Light Accent 1"], | |
} | |
if styles_to_get is None: | |
# If no specific styles are requested, return all styles | |
return ( | |
all_styles["title"], | |
all_styles["bullet_level_1"], | |
all_styles["bullet_level_2"], | |
all_styles["table"], | |
) | |
# Return only the requested styles | |
requested_styles = [all_styles[style] for style in styles_to_get] | |
return requested_styles if len(requested_styles) > 1 else requested_styles[0] | |
def initialize_map(extent: Extent, page_size: str): | |
"""Initialize map object.""" | |
# ? Define map (figure) size | |
figsize = define_map_size(page_size=page_size) | |
# ? Create map object | |
m = Maps( | |
crs=PROJ_CRS, | |
layer="base", | |
figsize=figsize, | |
num=1, | |
frameon=False, | |
dpi=DPI, | |
) | |
# ? Border color and line width | |
m.ax.spines[:].set_color(BORDER_COLOR) | |
m.ax.spines[:].set_linewidth(BORDER_LINE_WIDTH) | |
# ? Set map extent | |
m.set_extent( | |
extents=( | |
extent.limx_left, | |
extent.limx_right, | |
extent.limy_bottom, | |
extent.limy_top, | |
), | |
crs=PROJ_CRS, | |
) | |
return m | |
def change_orientation(doc: Document, layout: str = "landscape"): | |
"""Change the orientation of the document to landscape.""" | |
current_section = doc.sections[-1] | |
new_width, new_height = current_section.page_height, current_section.page_width | |
new_section = doc.add_section(WD_SECTION.NEW_PAGE) | |
new_section.orientation = WD_ORIENT.LANDSCAPE | |
new_section.page_width = new_width | |
new_section.page_height = new_height | |
current_section = doc.sections[-1] | |
margin = shared.Cm(0.0) if layout == "landscape" else shared.Cm(2.0) | |
current_section.left_margin = margin | |
current_section.right_margin = margin | |
current_section.top_margin = margin | |
current_section.bottom_margin = margin | |
# Set page size to A3 | |
current_section.page_width = ( | |
shared.Cm(42.0) if layout == "landscape" else shared.Cm(21.0) | |
) | |
current_section.page_height = shared.Cm(29.7) | |
def define_map_size(page_size: str) -> tuple[float, float]: | |
"""Define map size for landscape""" | |
mm = 1 / 25.4 | |
return { | |
"A4": (297 * mm, 210 * mm), | |
"A3": (420 * mm, 297 * mm), | |
"A2": (594 * mm, 420 * mm), | |
"A1": (841 * mm, 594 * mm), | |
"A0": (1189 * mm, 841 * mm), | |
}[page_size] | |
def constrain_aspect_ratio(diffx: float, diffy: float) -> tuple[float, float]: | |
"""Constrain aspect ratio.""" | |
aspect_ratio = diffy / diffx | |
if aspect_ratio > A_ASPECT_RATIO: | |
diffy_constrained = diffx * A_ASPECT_RATIO | |
if diffy_constrained < diffy: | |
diffy_constrained = diffy | |
diffx_constrained = diffy / A_ASPECT_RATIO | |
else: | |
diffx_constrained = diffy / A_ASPECT_RATIO | |
if diffx_constrained < diffx: | |
diffx_constrained = diffx | |
diffy_constrained = diffx * A_ASPECT_RATIO | |
return diffx_constrained, diffy_constrained | |
def adjust_overlap(header_text_obj, value_text_obj, x_value, y_value, type="header"): | |
"""Adjust overlap.""" | |
adjustment_step = 0.005 | |
sub_header_1_bbox = header_text_obj.get_window_extent() | |
sub_value_1_bbox = value_text_obj.get_window_extent() | |
while sub_header_1_bbox.overlaps(sub_value_1_bbox): | |
if type == "header": | |
y_value += adjustment_step | |
header_text_obj.set_position((x_value, y_value)) | |
sub_header_1_bbox = header_text_obj.get_window_extent() | |
else: | |
y_value -= adjustment_step # Move the subtitle down | |
value_text_obj.set_position((x_value, y_value)) | |
sub_value_1_bbox = value_text_obj.get_window_extent() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment