|
''' |
|
|
|
```python |
|
df = pl.DataFrame( |
|
{ |
|
"id": [1, 23, 456], |
|
"value": [3.1415, 2.718, 1.414], |
|
"ratio": [0.1234, 0.5678, 0.9], |
|
"user -[|objects": [{"name": "Alice"}, {"name": "Bob"}, {"name": "Eve"}], |
|
} |
|
) |
|
expr = utils.polars.format_expr( |
|
"""ID={id:03d}, |
|
{{}} | {{ | }} |
|
{ `user -[|objects`.name[-2:50] |
|
= }, |
|
{} |
|
{0}{0} |
|
value={value:.2f}, pct={ratio:.1%}""" |
|
) |
|
|
|
with pl.Config() as cfg: |
|
cfg.set_fmt_str_lengths(100) |
|
display( |
|
df, |
|
df.select( |
|
expr.alias("formatted"), |
|
) |
|
) |
|
``` |
|
|
|
''' |
|
|
|
import re |
|
from dataclasses import dataclass |
|
from string import Formatter |
|
from typing import Literal |
|
|
|
import polars as pl |
|
|
|
# regex to tokenize placeholder names, quoted names, and optional slices |
|
_TOKEN_RE = re.compile( |
|
r""" |
|
(?P<quoted>`(?P<quoted_name>(?:\\`|[^`])+)`) # backtick-quoted name, supports escaped backticks |
|
| (?P<name>\w+) # unquoted name (alphanumeric and underscore) |
|
| \[(?P<start>-?\d*):(?P<end>-?\d*)\] # an optional [start:end] supporting negatives |
|
""", |
|
re.VERBOSE, |
|
) |
|
|
|
# regex to parse full format-spec per Python's mini-language |
|
RE_FORMAT_SPEC = re.compile( |
|
r"(?:(?P<fill>[\s\S])?(?P<align>[<>=^]))?" |
|
r"(?P<sign>[- +])?" |
|
r"(?P<pos_zero>z)?" |
|
r"(?P<alt>#)?" |
|
r"(?P<zero_padding>0)?" |
|
r"(?P<width_str>\d+)?" |
|
r"(?P<grouping>[_,])?" |
|
r"(?:(?P<decimal>\.)(?P<precision_str>\d+))?" |
|
r"(?P<type>[bcdeEfFgGnosxX%])?" |
|
) |
|
"""taken from |
|
https://stackoverflow.com/a/78351366/29868685 |
|
with license |
|
https://creativecommons.org/licenses/by-sa/4.0/ |
|
""" |
|
|
|
@dataclass |
|
class FormatSpec: |
|
fill: str | None = None |
|
align: Literal["<", ">", "=", "^"] | None = None |
|
sign: Literal["+", "-", " "] | None = None |
|
pos_zero: Literal["z"] | None = None |
|
alt: Literal["#"] | None = None |
|
zero_padding: Literal["0"] | None = None |
|
width_str: str | None = None |
|
grouping: Literal["_", ","] | None = None |
|
decimal: Literal["."] | None = None |
|
precision_str: str | None = None |
|
type: ( |
|
Literal["b", "c", "d", "e", "E", "f", "F", "g", "G", "n", "o", "s", "x", "X", "%"] | None |
|
) = None |
|
"""Most of this class is taken from |
|
https://stackoverflow.com/a/78351366/29868685 |
|
with license |
|
https://creativecommons.org/licenses/by-sa/4.0/ |
|
""" |
|
|
|
@property |
|
def width(self) -> int: |
|
return int(self.width_str) if self.width_str else 0 |
|
|
|
@width.setter |
|
def width(self, val: int) -> None: |
|
self.width_str = str(val) |
|
|
|
@property |
|
def precision(self) -> int | None: |
|
"""In contrast to source this will return None, if precision is not specified""" |
|
return int(self.precision_str) if self.precision_str is not None else None |
|
|
|
@precision.setter |
|
def precision(self, val: int) -> None: |
|
self.precision_str = str(val) |
|
|
|
@classmethod |
|
def from_spec(cls, format_spec: str): |
|
"""This was a specific function in the source, not a classmethod""" |
|
m = RE_FORMAT_SPEC.fullmatch(format_spec) |
|
if m: |
|
return cls(**m.groupdict()) # type: ignore |
|
return cls() |
|
|
|
def __str__(self) -> str: # noqa: C901 |
|
# Original version |
|
# return "".join(v for v in asdict(self).values() if v is not None) |
|
|
|
# Less succinct but probably more precise |
|
parts: list[str] = [] |
|
if self.fill is not None and self.align is not None: |
|
parts.append(f"{self.fill}{self.align}") |
|
elif self.align is not None: |
|
parts.append(self.align) |
|
if self.sign: |
|
parts.append(self.sign) |
|
if self.pos_zero: |
|
parts.append(self.pos_zero) |
|
if self.alt: |
|
parts.append(self.alt) |
|
if self.zero_padding: |
|
parts.append(self.zero_padding) |
|
if self.width_str: |
|
parts.append(self.width_str) |
|
if self.grouping: |
|
parts.append(self.grouping) |
|
if self.decimal and self.precision_str: |
|
parts.append(f"{self.decimal}{self.precision_str}") |
|
if self.type: |
|
parts.append(self.type) |
|
return "".join(parts) |
|
|
|
|
|
def _parse_placeholder(spec: str) -> pl.Expr: |
|
"""Build a Polars Expr for field specs, supporting: |
|
- quoted column names with backticks (e.g. `col-name`) |
|
- alphanumeric names |
|
- struct fields |
|
- string slicing, _e.g._ `[:-3]` or `[5:7]`, without step |
|
|
|
Turn `foo.bar[2:5]` into `pl.col("foo").struct.field("bar").str.slice(2, 3)`. |
|
""" |
|
expr: pl.Expr | None = None |
|
sliced = False |
|
for m in _TOKEN_RE.finditer(spec): |
|
if sliced: |
|
raise ValueError("String slice must be last part of field placeholder") |
|
|
|
if m.group("quoted_name"): |
|
col = m.group("quoted_name").replace("\\`", "`") |
|
expr = pl.col(col) if expr is None else expr.struct.field(col) |
|
elif m.group("name"): |
|
expr = pl.col(m.group("name")) if expr is None else expr.struct.field(m.group("name")) |
|
else: |
|
# slicing a string expr |
|
# as this can only be the last part of the field we |
|
# should throw an error if it is not the last part of field placeholder |
|
if expr is None: |
|
raise ValueError("A valid field name has to precede a slicing range") |
|
|
|
sliced = True |
|
|
|
start_str, end_str = m.group("start"), m.group("end") |
|
start = int(start_str) if start_str not in (None, "") else 0 |
|
if end_str in (None, ""): |
|
# open-ended slice |
|
expr = expr.str.slice(start) |
|
continue |
|
|
|
end = int(end_str) |
|
length: int | pl.Expr |
|
if end >= 0: |
|
length = end - start |
|
else: |
|
char_length = expr.str.len_chars() # dynamic length per-string |
|
# negative endcount from end-of-string |
|
length = char_length + end - start |
|
expr = expr.str.slice(start, length) |
|
if expr is None: |
|
raise ValueError(f"Invalid placeholder spec: {spec!r}") |
|
return expr |
|
|
|
|
|
def _apply_format_spec(expr: pl.Expr, fmt_spec: str) -> pl.Expr: |
|
"""Apply parsed format spec to a Polars Expr.""" |
|
spec = FormatSpec.from_spec(fmt_spec) |
|
# Numeric types |
|
if spec.type in ("f", "F", "e", "E", "g", "G"): |
|
if spec.precision is not None: |
|
expr = expr.round(spec.precision) |
|
expr = expr.cast(pl.String) |
|
elif spec.type in ("b", "c", "d", "o", "x", "X", "n"): |
|
expr = expr.cast(pl.Int64).cast(pl.String) |
|
elif spec.type == "%": |
|
expr = expr * 100.0 |
|
if spec.precision is not None: |
|
expr = expr.round(spec.precision) |
|
expr = pl.concat_str(expr, pl.lit("%")) |
|
if spec.zero_padding and spec.width: |
|
expr = expr.str.zfill(spec.width) |
|
# TODO: handle fill, align, grouping, sign, alt |
|
return expr |
|
|
|
|
|
def format_expr(fmt: str) -> pl.Expr: |
|
""" |
|
Parse a Python-format string into a Polars Expr using `polars.concat_str`, |
|
via `string.Formatter.parse`, with support for: |
|
- empty fields {} => positional column indexing via pl.nth |
|
- numeric fields {0}, {1} => repeat or reorder via pl.nth |
|
- quoted column names with backticks (`col-name`) |
|
- field name echo, preserving whitespace around '=' (like '{ x = }') |
|
- struct fields and string slicing with non-negative indices |
|
- Python format-spec (precision, zero-padding, etc.) |
|
- Allows for {{, }} like escapes |
|
|
|
```python |
|
expr = format_expr("{x =:.2f}, name={user.name}, pct={ratio:.1%}") |
|
``` |
|
|
|
TODO: Possible improvements |
|
- Maybe change numeric and empty lookups to take on `*args: str | pl.Expr` and index into that |
|
- Allow for list/array slicing via (:) |
|
""" |
|
formatter = Formatter() |
|
exprs: list[pl.Expr] = [] |
|
auto_idx = 0 |
|
|
|
for literal, field, spec, _ in formatter.parse(fmt): |
|
if literal: |
|
exprs.append(pl.lit(literal, dtype=pl.String)) |
|
if field is None: |
|
continue |
|
# detect debugging echo syntax with optional whitespace around '=' |
|
stripped = field.strip() |
|
if not stripped: |
|
key_idx = auto_idx |
|
auto_idx += 1 |
|
expr = pl.nth(key_idx) |
|
elif stripped.isdigit(): |
|
key_idx = int(stripped) |
|
expr = pl.nth(key_idx) |
|
else: |
|
echo = stripped.endswith("=") |
|
if echo: |
|
# preserve the exact whitespace+ '=' string |
|
exprs.append(pl.lit(field, dtype=pl.String)) |
|
stripped = stripped.removesuffix("=").strip() |
|
expr = _parse_placeholder(stripped) |
|
if spec: |
|
expr = _apply_format_spec(expr, spec) |
|
exprs.append(expr) |
|
|
|
return pl.concat_str(exprs) |