Last active
January 21, 2020 09:01
-
-
Save twolodzko/7b5be323780921eb93a49dbed970d18b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| This is a pure python implementation of t-digest algorithm 1. Parts of the code are borrowed | |
| from https://github.com/CamDavidsonPilon/tdigest | |
| Code is released under The MIT License (MIT). | |
| """ | |
| import math | |
| from typing import List, Tuple, Union, Optional | |
| import numpy as np | |
| Array = Union[np.ndarray, List[float]] | |
| def length(arr): | |
| """Calculate length of any object, including None and scalars | |
| >>> import numpy as np | |
| >>> length(371) | |
| 1 | |
| >>> length(None) | |
| 0 | |
| >>> length([]) | |
| 0 | |
| >>> length(np.array([])) | |
| 0 | |
| >>> length([1, 2, 3]) | |
| 3 | |
| >>> length(np.ones(5)) | |
| 5 | |
| """ | |
| if arr is None: | |
| return 0 | |
| try: | |
| return len(arr) | |
| except TypeError: | |
| return 1 | |
| class WeightedArray: | |
| def __init__( | |
| self, | |
| means: Optional[Array] = None, | |
| counts: Optional[Array] = None, | |
| totals: Optional[Array] = None, | |
| dtype: Union[str, np.dtype] = 'float32' | |
| ) -> None: | |
| """ | |
| >>> WeightedArray() | |
| [] | |
| >>> WeightedArray(1) | |
| [(1.0, 1.0)] | |
| >>> WeightedArray(1, 2) | |
| [(1.0, 2.0)] | |
| >>> WeightedArray([1, 2, 3]) | |
| [(1.0, 1.0), (2.0, 1.0), (3.0, 1.0)] | |
| >>> WeightedArray(totals=[1, 2, 3], counts=[2, 1, 4]) | |
| [(0.5, 2.0), (2.0, 1.0), (0.75, 4.0)] | |
| """ | |
| self.dtype = dtype | |
| if length(counts): | |
| self.counts = np.atleast_1d(counts) | |
| else: | |
| self.counts = np.ones(length(means) or length(totals)) | |
| self.counts = self.counts.astype(self.dtype) | |
| if means is not None: | |
| self.totals = np.atleast_1d(means * self.counts).astype(self.dtype) | |
| else: | |
| if totals is None: | |
| totals = [] | |
| self.totals = np.atleast_1d(totals).astype(self.dtype) | |
| if len(self.totals) != len(self.counts): | |
| raise ValueError | |
| if np.any(self.counts < 0): | |
| raise ValueError | |
| @property | |
| def means(self) -> np.ndarray: | |
| """ | |
| >>> WeightedArray(42).means | |
| array([42.], dtype=float32) | |
| >>> WeightedArray(means=10, counts=2).means | |
| array([10.], dtype=float32) | |
| >>> WeightedArray(totals=[10, 12, 3], counts=[2, 4, 1]).means | |
| array([5., 3., 3.], dtype=float32) | |
| """ | |
| return self.totals / self.counts | |
| @property | |
| def cumulative_counts(self) -> np.ndarray: | |
| """ | |
| >>> WeightedArray([7, 3, 4], [1, 1, 2]).cumulative_counts | |
| array([1., 2., 4.], dtype=float32) | |
| """ | |
| return np.cumsum(self.counts) | |
| @property | |
| def size(self) -> int: | |
| """ | |
| >>> WeightedArray([1, 2], [2, 4]).size | |
| 6 | |
| """ | |
| return int(np.sum(self.counts)) | |
| def __len__(self) -> int: | |
| """ | |
| >>> len(WeightedArray([1, 2, 4], [2, 4, 1])) | |
| 3 | |
| """ | |
| return len(self.totals) | |
| def __getitem__(self, i) -> 'WeightedArray': | |
| """ | |
| >>> WeightedArray(2) == WeightedArray([2, 8, 7])[0] | |
| True | |
| """ | |
| return WeightedArray(totals=self.totals[[i]], counts=self.counts[[i]]) | |
| def __iter__(self) -> 'WeightedArray': | |
| for i in range(len(self)): | |
| yield self[i] | |
| def __add__(self, other: 'WeightedArray') -> 'WeightedArray': | |
| """ | |
| >>> WeightedArray(1) + WeightedArray(2) | |
| [(1.5, 2.0)] | |
| >>> WeightedArray(1) + WeightedArray(2, 3) | |
| [(1.75, 4.0)] | |
| """ | |
| if not isinstance(other, self.__class__): | |
| raise TypeError | |
| if len(self) != len(other): | |
| raise IndexError | |
| return WeightedArray(totals=self.totals + other.totals, counts=self.counts + other.counts) | |
| def __or__(self, other: 'WeightedArray') -> 'WeightedArray': | |
| """ | |
| >>> WeightedArray([1, 2]) | WeightedArray([6, 12], [2, 3]) | |
| [(1.0, 1.0), (2.0, 1.0), (6.0, 2.0), (12.0, 3.0)] | |
| """ | |
| if not isinstance(other, self.__class__): | |
| raise TypeError | |
| totals = np.concatenate([self.totals, other.totals]) | |
| counts = np.concatenate([self.counts, other.counts]) | |
| return WeightedArray(totals=totals, counts=counts) | |
| def __eq__(self, other: 'WeightedArray') -> bool: | |
| """ | |
| >>> WeightedArray(1) == WeightedArray(1, 1) | |
| True | |
| """ | |
| if not isinstance(other, self.__class__): | |
| return False | |
| return np.all(self.totals == other.totals) and np.all(self.counts == other.counts) | |
| def __repr__(self) -> str: | |
| """ | |
| >>> WeightedArray([1, 4], [1, 2]) | |
| [(1.0, 1.0), (4.0, 2.0)] | |
| """ | |
| return repr(list(zip(self.means, self.counts))) | |
| def sort(self) -> 'WeightedArray': | |
| """ | |
| >>> WeightedArray([3, 1, 2]).sort().means | |
| array([1., 2., 3.], dtype=float32) | |
| """ | |
| idx = np.argsort(self.means) | |
| totals = self.totals[idx] | |
| counts = self.counts[idx] | |
| return WeightedArray(totals=totals, counts=counts) | |
| def mean(self) -> float: | |
| """ | |
| >>> import numpy as np | |
| >>> values = [2, 5, 3, 2, 1, 7] | |
| >>> (np.mean(values) - WeightedArray(values).mean()) < 1e-5 | |
| True | |
| >>> counts = [2, 3, 3, 5, 1, 7] | |
| >>> (np.mean(np.repeat(values, counts)) - WeightedArray(values, counts).mean()) < 1e-5 | |
| True | |
| """ | |
| return np.sum(self.totals * self.counts) / np.sum(self.counts) | |
| def quantile(self, p: float) -> float: | |
| if p == 0: | |
| return self.means[0] | |
| n = p * self.size | |
| t = self.counts[0] / 2 | |
| for i in range(1, len(self)-1): | |
| k = (self.counts[i-1] + self.counts[i]) / 2 | |
| if n < t + k: | |
| z1 = n - t | |
| z2 = t + k - n | |
| return (self.means[i-1] * z2 + self.means[i] * z1) / (z1 + z2) | |
| t += k | |
| return self.means[-1] | |
| def cdf(self, x: float) -> float: | |
| t = 0 | |
| for i in range(len(self)): | |
| if i < len(self)-1: | |
| D = (self.means[i+1] - self.means[i])/2 | |
| else: | |
| D = (self.means[i] - self.means[i-1])/2 | |
| z = max(-1, (x - self.means[i])/D) | |
| if z < 1: | |
| return (t/self.size) + (self.counts[i]/self.size) * (z+1)/2 | |
| t = t + self.counts[i] | |
| return 1 | |
| class TDigest: | |
| def __init__(self, delta: float, centroids: Optional['WeightedArray'] = None) -> None: | |
| self.delta = delta | |
| self.centroids = centroids or WeightedArray() | |
| if len(self.centroids) > math.ceil(self.delta): | |
| self.merge() | |
| def merge(self, values: List[float] = [], counts: List[float] = []) -> 'TDigest': | |
| """ | |
| Pack the data into smaller number of bins | |
| Algorithm 1 from the "Computing Extremely Accurate Quantiles Using t-Digests" | |
| paper by Ted Dunning & Otmar Ertl. | |
| >>> delta = 10 | |
| >>> data = [i for i in range(100)] | |
| >>> number_of_clusters = len(TDigest(delta).merge(values=data).centroids) | |
| >>> delta/2 <= number_of_clusters <= delta | |
| True | |
| """ | |
| centroids = WeightedArray() | |
| data = WeightedArray(means=values, counts=counts) | |
| data = (self.centroids | data).sort() | |
| sigma = data[0] | |
| q0 = 0 | |
| for i in range(1, len(data)): | |
| q = q0 + (sigma.counts + data[i].counts)/data.size | |
| if q <= self.qlimit(q0): | |
| sigma += data[i] | |
| else: | |
| centroids |= sigma | |
| q0 += sigma.counts/data.size | |
| sigma = data[i] | |
| self.centroids = centroids | sigma | |
| return self | |
| def qlimit(self, q0: float) -> float: | |
| return self.inv_bin_size(self.bin_size(q0) + 1) | |
| def bin_size(self, q: float) -> float: | |
| """ | |
| >>> TDigest(delta=1).bin_size(0.0) | |
| -0.25 | |
| >>> TDigest(delta=1).bin_size(0.5) | |
| 0.0 | |
| >>> TDigest(delta=1).bin_size(1.0) | |
| 0.25 | |
| """ | |
| return self.delta * math.asin(2*q - 1)/(2 * math.pi) | |
| def inv_bin_size(self, k: float) -> float: | |
| """ | |
| >>> TDigest(delta=1).inv_bin_size(-0.25) | |
| 0.0 | |
| >>> TDigest(delta=1).inv_bin_size(0.0) | |
| 0.5 | |
| >>> TDigest(delta=1).inv_bin_size(0.25) | |
| 1.0 | |
| """ | |
| return (math.sin(k * (2 * math.pi / self.delta)) + 1) / 2 | |
| if __name__ == "__main__": | |
| import doctest | |
| doctest.testmod() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment