Last active
August 6, 2024 05:34
-
-
Save beskep/19ca93d5449e97a404dc27beffd6b17a to your computer and use it in GitHub Desktop.
misc
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections.abc import Iterable | |
from dataclasses import KW_ONLY, dataclass | |
from pathlib import Path | |
import polars as pl | |
import polars.selectors as cs | |
from polars._typing import ColumnWidthsDefinition | |
from xlsxwriter import Workbook | |
DESC_COLS = ('count', 'null_count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max') | |
def transpose_description(desc: pl.DataFrame, decimals: int = 4): | |
return ( | |
desc.with_columns(cs.float().round(decimals)) | |
.drop('statistic') | |
.transpose(include_header=True, column_names=DESC_COLS) | |
.with_columns(pl.col(DESC_COLS[:2]).cast(pl.Float64).cast(pl.UInt64)) | |
.rename({'column': 'variable'}) | |
) | |
@dataclass | |
class PolarsSummary: | |
data: pl.DataFrame | pl.LazyFrame | |
group: str | Iterable[str] | None = None | |
_ = KW_ONLY | |
transpose: bool = True | |
decimals: int = 4 | |
max_string_category: int | None = 42 | |
sort: bool = True | |
group_prefix: str | None = 'group:' | |
def __post_init__(self): | |
if self.group is not None: | |
self.group = ( | |
(self.group,) if isinstance(self.group, str) else tuple(self.group) | |
) | |
def _describe( | |
self, | |
data: pl.DataFrame | pl.LazyFrame | None = None, | |
selector=None, | |
): | |
if data is None: | |
data = self.data | |
if self.group: | |
data = data.drop(self.group, strict=False) | |
selector = cs.numeric() | cs.boolean() if selector is None else selector | |
desc = data.select(selector).describe() | |
if self.transpose: | |
desc = transpose_description(desc) | |
return desc | |
def _describe_by(self, selector=None): | |
assert isinstance(self.group, tuple) | |
for name, df in ( | |
self.data.lazy() | |
.collect() | |
.group_by(self.group, maintain_order=not self.sort) | |
): | |
yield self._describe(df, selector=selector).select( | |
*(pl.lit(n).alias(g) for n, g in zip(name, self.group, strict=True)), | |
pl.all(), | |
) | |
def describe(self, selector=None): | |
if self.group is None: | |
return self._describe(selector=selector) | |
df = pl.concat(self._describe_by(selector), how='vertical_relaxed') | |
if self.sort: | |
df = df.sort(self.group) | |
if self.group_prefix: | |
df = df.rename({x: f'{self.group_prefix}{x}' for x in self.group}) | |
return df | |
def _count_string(self, data: pl.DataFrame | pl.LazyFrame | None = None): | |
if data is None: | |
data = self.data | |
if self.group: | |
data = data.drop(self.group, strict=False) | |
return ( | |
data.lazy() | |
.select(cs.string() | cs.categorical()) | |
.unpivot() | |
.group_by('variable', 'value', maintain_order=True) | |
.len('count') | |
.with_columns( | |
pl.col('count') | |
.truediv(pl.sum('count').over('variable')) | |
.alias('proportion') | |
) | |
.collect() | |
) | |
def _count_string_by(self): | |
assert isinstance(self.group, tuple) | |
for name, df in ( | |
self.data.lazy() | |
.collect() | |
.group_by(self.group, maintain_order=not self.sort) | |
): | |
yield self._count_string(df).select( | |
*(pl.lit(n).alias(g) for n, g in zip(name, self.group, strict=True)), | |
pl.all(), | |
) | |
def count_string(self): | |
if self.group is None: | |
df = self._count_string() | |
else: | |
df = pl.concat(self._count_string_by(), how='vertical_relaxed') | |
if self.sort: | |
df = df.sort(pl.all()) | |
if self.group and self.group_prefix: | |
df = df.rename({x: f'{self.group_prefix}{x}' for x in self.group}) | |
return df | |
def write_excel( | |
self, | |
path: str | Path, | |
column_widths: ColumnWidthsDefinition | None = 100, | |
): | |
with Workbook(path) as wb: | |
if self.data.select(cs.numeric() | cs.boolean()).collect_schema().len(): | |
self.describe().write_excel( | |
wb, worksheet='numeric', column_widths=column_widths | |
) | |
if self.data.select(cs.temporal()).collect_schema().len(): | |
self.describe(selector=cs.temporal()).write_excel( | |
wb, worksheet='temporal', column_widths=column_widths | |
) | |
sc = cs.string() | cs.categorical() | |
if not self.data.select(sc).collect_schema().len(): | |
return | |
self.describe(selector=sc).write_excel( | |
wb, worksheet='string', column_widths=column_widths | |
) | |
count = self.count_string() | |
if self.max_string_category: | |
count = count.filter( | |
pl.col('value').n_unique().over('variable') | |
<= self.max_string_category | |
) | |
if not count.height: | |
return | |
count.write_excel( | |
wb, | |
worksheet='string count', | |
column_formats={'proportion': '0.00%'}, | |
conditional_formats={ | |
'proportion': {'type': 'data_bar', 'bar_solid': True} | |
}, | |
column_widths=column_widths, | |
) | |
if self.group is not None: | |
group = ( | |
[f'{self.group_prefix}{x}' for x in self.group] | |
if self.group_prefix | |
else self.group | |
) | |
( | |
count.with_columns(pl.col(group).fill_null('Null')) | |
.with_columns(pl.concat_str(group, separator='_').alias('__group')) | |
.pivot('__group', index=['variable', 'value'], values='count') | |
.sort('variable', 'value') | |
.write_excel(wb, worksheet='string count (pivot)') | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment