Skip to content

Instantly share code, notes, and snippets.

@twolodzko
Last active January 21, 2020 09:01
Show Gist options
  • Select an option

  • Save twolodzko/7b5be323780921eb93a49dbed970d18b to your computer and use it in GitHub Desktop.

Select an option

Save twolodzko/7b5be323780921eb93a49dbed970d18b to your computer and use it in GitHub Desktop.
"""
This is a pure python implementation of t-digest algorithm 1. Parts of the code are borrowed
from https://github.com/CamDavidsonPilon/tdigest
Code is released under The MIT License (MIT).
"""
import math
from typing import List, Tuple, Union, Optional
import numpy as np
Array = Union[np.ndarray, List[float]]
def length(arr):
"""Calculate length of any object, including None and scalars
>>> import numpy as np
>>> length(371)
1
>>> length(None)
0
>>> length([])
0
>>> length(np.array([]))
0
>>> length([1, 2, 3])
3
>>> length(np.ones(5))
5
"""
if arr is None:
return 0
try:
return len(arr)
except TypeError:
return 1
class WeightedArray:
def __init__(
self,
means: Optional[Array] = None,
counts: Optional[Array] = None,
totals: Optional[Array] = None,
dtype: Union[str, np.dtype] = 'float32'
) -> None:
"""
>>> WeightedArray()
[]
>>> WeightedArray(1)
[(1.0, 1.0)]
>>> WeightedArray(1, 2)
[(1.0, 2.0)]
>>> WeightedArray([1, 2, 3])
[(1.0, 1.0), (2.0, 1.0), (3.0, 1.0)]
>>> WeightedArray(totals=[1, 2, 3], counts=[2, 1, 4])
[(0.5, 2.0), (2.0, 1.0), (0.75, 4.0)]
"""
self.dtype = dtype
if length(counts):
self.counts = np.atleast_1d(counts)
else:
self.counts = np.ones(length(means) or length(totals))
self.counts = self.counts.astype(self.dtype)
if means is not None:
self.totals = np.atleast_1d(means * self.counts).astype(self.dtype)
else:
if totals is None:
totals = []
self.totals = np.atleast_1d(totals).astype(self.dtype)
if len(self.totals) != len(self.counts):
raise ValueError
if np.any(self.counts < 0):
raise ValueError
@property
def means(self) -> np.ndarray:
"""
>>> WeightedArray(42).means
array([42.], dtype=float32)
>>> WeightedArray(means=10, counts=2).means
array([10.], dtype=float32)
>>> WeightedArray(totals=[10, 12, 3], counts=[2, 4, 1]).means
array([5., 3., 3.], dtype=float32)
"""
return self.totals / self.counts
@property
def cumulative_counts(self) -> np.ndarray:
"""
>>> WeightedArray([7, 3, 4], [1, 1, 2]).cumulative_counts
array([1., 2., 4.], dtype=float32)
"""
return np.cumsum(self.counts)
@property
def size(self) -> int:
"""
>>> WeightedArray([1, 2], [2, 4]).size
6
"""
return int(np.sum(self.counts))
def __len__(self) -> int:
"""
>>> len(WeightedArray([1, 2, 4], [2, 4, 1]))
3
"""
return len(self.totals)
def __getitem__(self, i) -> 'WeightedArray':
"""
>>> WeightedArray(2) == WeightedArray([2, 8, 7])[0]
True
"""
return WeightedArray(totals=self.totals[[i]], counts=self.counts[[i]])
def __iter__(self) -> 'WeightedArray':
for i in range(len(self)):
yield self[i]
def __add__(self, other: 'WeightedArray') -> 'WeightedArray':
"""
>>> WeightedArray(1) + WeightedArray(2)
[(1.5, 2.0)]
>>> WeightedArray(1) + WeightedArray(2, 3)
[(1.75, 4.0)]
"""
if not isinstance(other, self.__class__):
raise TypeError
if len(self) != len(other):
raise IndexError
return WeightedArray(totals=self.totals + other.totals, counts=self.counts + other.counts)
def __or__(self, other: 'WeightedArray') -> 'WeightedArray':
"""
>>> WeightedArray([1, 2]) | WeightedArray([6, 12], [2, 3])
[(1.0, 1.0), (2.0, 1.0), (6.0, 2.0), (12.0, 3.0)]
"""
if not isinstance(other, self.__class__):
raise TypeError
totals = np.concatenate([self.totals, other.totals])
counts = np.concatenate([self.counts, other.counts])
return WeightedArray(totals=totals, counts=counts)
def __eq__(self, other: 'WeightedArray') -> bool:
"""
>>> WeightedArray(1) == WeightedArray(1, 1)
True
"""
if not isinstance(other, self.__class__):
return False
return np.all(self.totals == other.totals) and np.all(self.counts == other.counts)
def __repr__(self) -> str:
"""
>>> WeightedArray([1, 4], [1, 2])
[(1.0, 1.0), (4.0, 2.0)]
"""
return repr(list(zip(self.means, self.counts)))
def sort(self) -> 'WeightedArray':
"""
>>> WeightedArray([3, 1, 2]).sort().means
array([1., 2., 3.], dtype=float32)
"""
idx = np.argsort(self.means)
totals = self.totals[idx]
counts = self.counts[idx]
return WeightedArray(totals=totals, counts=counts)
def mean(self) -> float:
"""
>>> import numpy as np
>>> values = [2, 5, 3, 2, 1, 7]
>>> (np.mean(values) - WeightedArray(values).mean()) < 1e-5
True
>>> counts = [2, 3, 3, 5, 1, 7]
>>> (np.mean(np.repeat(values, counts)) - WeightedArray(values, counts).mean()) < 1e-5
True
"""
return np.sum(self.totals * self.counts) / np.sum(self.counts)
def quantile(self, p: float) -> float:
if p == 0:
return self.means[0]
n = p * self.size
t = self.counts[0] / 2
for i in range(1, len(self)-1):
k = (self.counts[i-1] + self.counts[i]) / 2
if n < t + k:
z1 = n - t
z2 = t + k - n
return (self.means[i-1] * z2 + self.means[i] * z1) / (z1 + z2)
t += k
return self.means[-1]
def cdf(self, x: float) -> float:
t = 0
for i in range(len(self)):
if i < len(self)-1:
D = (self.means[i+1] - self.means[i])/2
else:
D = (self.means[i] - self.means[i-1])/2
z = max(-1, (x - self.means[i])/D)
if z < 1:
return (t/self.size) + (self.counts[i]/self.size) * (z+1)/2
t = t + self.counts[i]
return 1
class TDigest:
def __init__(self, delta: float, centroids: Optional['WeightedArray'] = None) -> None:
self.delta = delta
self.centroids = centroids or WeightedArray()
if len(self.centroids) > math.ceil(self.delta):
self.merge()
def merge(self, values: List[float] = [], counts: List[float] = []) -> 'TDigest':
"""
Pack the data into smaller number of bins
Algorithm 1 from the "Computing Extremely Accurate Quantiles Using t-Digests"
paper by Ted Dunning & Otmar Ertl.
>>> delta = 10
>>> data = [i for i in range(100)]
>>> number_of_clusters = len(TDigest(delta).merge(values=data).centroids)
>>> delta/2 <= number_of_clusters <= delta
True
"""
centroids = WeightedArray()
data = WeightedArray(means=values, counts=counts)
data = (self.centroids | data).sort()
sigma = data[0]
q0 = 0
for i in range(1, len(data)):
q = q0 + (sigma.counts + data[i].counts)/data.size
if q <= self.qlimit(q0):
sigma += data[i]
else:
centroids |= sigma
q0 += sigma.counts/data.size
sigma = data[i]
self.centroids = centroids | sigma
return self
def qlimit(self, q0: float) -> float:
return self.inv_bin_size(self.bin_size(q0) + 1)
def bin_size(self, q: float) -> float:
"""
>>> TDigest(delta=1).bin_size(0.0)
-0.25
>>> TDigest(delta=1).bin_size(0.5)
0.0
>>> TDigest(delta=1).bin_size(1.0)
0.25
"""
return self.delta * math.asin(2*q - 1)/(2 * math.pi)
def inv_bin_size(self, k: float) -> float:
"""
>>> TDigest(delta=1).inv_bin_size(-0.25)
0.0
>>> TDigest(delta=1).inv_bin_size(0.0)
0.5
>>> TDigest(delta=1).inv_bin_size(0.25)
1.0
"""
return (math.sin(k * (2 * math.pi / self.delta)) + 1) / 2
if __name__ == "__main__":
import doctest
doctest.testmod()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment