Skip to content

Instantly share code, notes, and snippets.

@dustalov
Last active August 27, 2023 16:53
Show Gist options
  • Save dustalov/41678b70c40ba5a55430fa5e77b121d9 to your computer and use it in GitHub Desktop.
Save dustalov/41678b70c40ba5a55430fa5e77b121d9 to your computer and use it in GitHub Desktop.
Answer Aggregation with Bradley-Terry
#!/usr/bin/env python3
"""
An implementation of the Bradley-Terry ranking aggregation algorithm from the paper
MM algorithms for generalized Bradley-Terry models
<https://doi.org/10.1214/aos/1079120141>.
"""
__author__ = 'Dmitry Ustalov'
__copyright__ = 'Copyright 2021 Dmitry Ustalov'
__license__ = 'MIT' # https://opensource.org/licenses/MIT
import numpy as np
import numpy.typing as npt
EPS = 1e-8
# M_ij indicates the number of times item 'i' ranks higher than item 'j'
M: npt.NDArray[np.int64] = np.array([
[0, 1, 2, 0, 1],
[2, 0, 2, 1, 0],
[1, 2, 0, 0, 1],
[1, 2, 1, 0, 2],
[2, 0, 1, 3, 0],
], dtype=np.int64)
def main() -> None:
T = M.T + M
active = T > 0
w = M.sum(axis=1)
Z = np.zeros_like(M, dtype=float)
p = np.ones(M.shape[0])
p_new = p.copy()
converged, iterations = False, 0
while not converged:
iterations += 1
P = np.broadcast_to(p, M.shape)
Z[active] = T[active] / (P[active] + P.T[active])
p_new[:] = w
p_new /= Z.sum(axis=0)
p_new /= p_new.sum()
converged = bool(np.linalg.norm(p_new - p) < EPS)
p[:] = p_new
print(f'{iterations} iteration(s)')
print(p) # [0.12151104 0.15699947 0.11594851 0.31022851 0.29531247]
print(p[:, np.newaxis] / (p + p[:, np.newaxis]))
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""
A brute-force implementation of the Bradley-Terry ranking aggregation algorithm from the paper
MM algorithms for generalized Bradley-Terry models
<https://doi.org/10.1214/aos/1079120141>.
"""
__author__ = 'Dmitry Ustalov'
__copyright__ = 'Copyright 2021 Dmitry Ustalov'
__license__ = 'MIT' # https://opensource.org/licenses/MIT
import numpy as np
import numpy.typing as npt
from scipy.optimize import fmin_cg
# M_ij indicates the number of times item 'i' ranks higher than item 'j'
M: npt.NDArray[np.int64] = np.array([
[0, 1, 2, 0, 1],
[2, 0, 2, 1, 0],
[1, 2, 0, 0, 1],
[1, 2, 1, 0, 2],
[2, 0, 1, 3, 0],
], dtype=np.int64)
def loss(p: npt.NDArray[np.float64], M: npt.NDArray[np.int64]) -> np.float64:
P = np.broadcast_to(p, M.shape).T # whole i-th row should be p_i
pi: np.float64 = (M * np.log(P)).sum()
pij: np.float64 = (M * np.log(P + P.T)).sum()
return pij - pi # likelihood is (pi - pij) and we need a loss function
def main() -> None:
p = fmin_cg(loss, np.ones(M.shape[0]), args=(M,))
p /= p.sum()
print(p) # close to [0.12151104 0.15699947 0.11594851 0.31022851 0.29531247]
print(p[:, np.newaxis] / (p + p[:, np.newaxis]))
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""
An implementation of the ranking aggregation algorithm from the paper
Efficient Computation of Rankings from Pairwise Comparisons
<https://www.jmlr.org/papers/v24/22-1086.html>.
"""
__author__ = 'Dmitry Ustalov'
__copyright__ = 'Copyright 2023 Dmitry Ustalov'
__license__ = 'MIT' # https://opensource.org/licenses/MIT
import numpy as np
import numpy.typing as npt
# M_ij indicates the number of times item 'i' ranks higher than item 'j'
M: npt.NDArray[np.int64] = np.array([
[0, 1, 2, 0, 1],
[2, 0, 2, 1, 0],
[1, 2, 0, 0, 1],
[1, 2, 1, 0, 2],
[2, 0, 1, 3, 0],
], dtype=np.int64)
# tie matrix
T = np.minimum(M, M.T)
# win matrix
W = M - T
EPS = 10e-6
def main() -> None:
np.random.seed(0)
pi = np.random.rand(M.shape[0])
v = np.random.rand()
converged, iterations = False, 0
while not converged:
iterations += 1
v_numerator = np.sum(
T * (pi[:, np.newaxis] + pi) /
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi))
) / 2
v_denominator = np.sum(
W * 2 * np.sqrt(pi[:, np.newaxis] * pi) /
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi))
)
v = v_numerator / v_denominator
pi_old = pi.copy()
pi_numerator = np.sum(
(W + T / 2) * (pi + v * np.sqrt(pi[:, np.newaxis] * pi)) /
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)),
axis=1
)
pi_denominator = np.sum(
(W + T / 2) * (1 + v * np.sqrt(pi[:, np.newaxis] * pi)) /
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)),
axis=0
)
pi = pi_numerator / pi_denominator
converged = np.allclose(pi / (pi + 1), pi_old / (pi_old + 1), rtol=EPS, atol=EPS)
print(f'{iterations} iteration(s)')
print(pi)
print(pi[:, np.newaxis] / (pi + pi[:, np.newaxis]))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment