Skip to content

Instantly share code, notes, and snippets.

@pybokeh
Last active October 12, 2023 11:17
Show Gist options
  • Save pybokeh/9fd661dd3c430da2a8dcbb65c8e3d007 to your computer and use it in GitHub Desktop.
Save pybokeh/9fd661dd3c430da2a8dcbb65c8e3d007 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@cpcloud
Copy link

cpcloud commented Oct 12, 2023

Here's a version that uses an Ibis UDF:

import pyarrow as pa

import ibis
from ibis import _, udf

ibis.options.interactive = True

client = ibis.duckdb.connect()
failures = client.read_csv("/data/rivet_failures.csv")


@udf.scalar.pyarrow
def adjusted_rank(n: int, col_status: str, col_rev_rank: int) -> float:
    # Previous adjusted rank initialized to zero
    prev_adj_rank = [0]

    def adj_rank(n, status, rev_rank):
        if status == "SUSPENDED":
            return None
        else:
            adjusted_rank = (rev_rank * prev_adj_rank[0] + (n + 1)) / (rev_rank + 1)
            # Update previous adjusted rank to the current adjusted rank
            prev_adj_rank[0] = adjusted_rank
            return adjusted_rank

    return pa.array(
        map(adj_rank, n.to_numpy(), col_status.to_numpy(), col_rev_rank.to_numpy())
    )


ranks = (
    failures.mutate(
        status=(
            ibis.case()
            .when(_.failure_mode == "Flair Failure", "FAILED")
            .else_("SUSPENDED")
            .end()
        )
    )
    .order_by(_.failure_time_minutes)
    .mutate(rank=ibis.row_number() + 1)
    .mutate(reverse_rank=failures.count() + 1 - _.rank)
    .mutate(adjusted_rank=adjusted_rank(_.count(), _.status, _.reverse_rank))
)

print(ranks)

which gives

┏━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ serial_number ┃ failure_time_minutes ┃ failure_mode   ┃ status    ┃ rank  ┃ reverse_rank ┃ adjusted_rank ┃
┡━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ int64         │ int64                │ string         │ string    │ int64 │ int64        │ float64       │
├───────────────┼──────────────────────┼────────────────┼───────────┼───────┼──────────────┼───────────────┤
│             7 │                   10 │ Lug failed     │ SUSPENDED │     1 │            8 │           nan │
│             4 │                   30 │ Flair Failure  │ FAILED    │     2 │            7 │        1.1250 │
│             6 │                   45 │ Flair loosened │ SUSPENDED │     3 │            6 │           nan │
│             5 │                   49 │ Flair Failure  │ FAILED    │     4 │            5 │        2.4375 │
│             8 │                   82 │ Flair Failure  │ FAILED    │     5 │            4 │        3.7500 │
│             1 │                   90 │ Flair Failure  │ FAILED    │     6 │            3 │        5.0625 │
│             2 │                   96 │ Flair Failure  │ FAILED    │     7 │            2 │        6.3750 │
│             3 │                  100 │ Flair loosened │ SUSPENDED │     8 │            1 │           nan │
└───────────────┴──────────────────────┴────────────────┴───────────┴───────┴──────────────┴───────────────┘

The main differences are:

  1. DuckDB UDFs only support PyArrow input and output, so my implementation is using PyArrow APIs.
  2. You can't mix types like strings and floats in the same column in Ibis (and most of our backends do not support this) so I replaced the string you have here in the adjusted_rank column with None (which pandas converts into a NaN).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment