Last active
August 27, 2023 16:53
-
-
Save dustalov/41678b70c40ba5a55430fa5e77b121d9 to your computer and use it in GitHub Desktop.
Answer Aggregation with Bradley-Terry
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
An implementation of the Bradley-Terry ranking aggregation algorithm from the paper | |
MM algorithms for generalized Bradley-Terry models | |
<https://doi.org/10.1214/aos/1079120141>. | |
""" | |
__author__ = 'Dmitry Ustalov' | |
__copyright__ = 'Copyright 2021 Dmitry Ustalov' | |
__license__ = 'MIT' # https://opensource.org/licenses/MIT | |
import numpy as np | |
import numpy.typing as npt | |
EPS = 1e-8 | |
# M_ij indicates the number of times item 'i' ranks higher than item 'j' | |
M: npt.NDArray[np.int64] = np.array([ | |
[0, 1, 2, 0, 1], | |
[2, 0, 2, 1, 0], | |
[1, 2, 0, 0, 1], | |
[1, 2, 1, 0, 2], | |
[2, 0, 1, 3, 0], | |
], dtype=np.int64) | |
def main() -> None: | |
T = M.T + M | |
active = T > 0 | |
w = M.sum(axis=1) | |
Z = np.zeros_like(M, dtype=float) | |
p = np.ones(M.shape[0]) | |
p_new = p.copy() | |
converged, iterations = False, 0 | |
while not converged: | |
iterations += 1 | |
P = np.broadcast_to(p, M.shape) | |
Z[active] = T[active] / (P[active] + P.T[active]) | |
p_new[:] = w | |
p_new /= Z.sum(axis=0) | |
p_new /= p_new.sum() | |
converged = bool(np.linalg.norm(p_new - p) < EPS) | |
p[:] = p_new | |
print(f'{iterations} iteration(s)') | |
print(p) # [0.12151104 0.15699947 0.11594851 0.31022851 0.29531247] | |
print(p[:, np.newaxis] / (p + p[:, np.newaxis])) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
A brute-force implementation of the Bradley-Terry ranking aggregation algorithm from the paper | |
MM algorithms for generalized Bradley-Terry models | |
<https://doi.org/10.1214/aos/1079120141>. | |
""" | |
__author__ = 'Dmitry Ustalov' | |
__copyright__ = 'Copyright 2021 Dmitry Ustalov' | |
__license__ = 'MIT' # https://opensource.org/licenses/MIT | |
import numpy as np | |
import numpy.typing as npt | |
from scipy.optimize import fmin_cg | |
# M_ij indicates the number of times item 'i' ranks higher than item 'j' | |
M: npt.NDArray[np.int64] = np.array([ | |
[0, 1, 2, 0, 1], | |
[2, 0, 2, 1, 0], | |
[1, 2, 0, 0, 1], | |
[1, 2, 1, 0, 2], | |
[2, 0, 1, 3, 0], | |
], dtype=np.int64) | |
def loss(p: npt.NDArray[np.float64], M: npt.NDArray[np.int64]) -> np.float64: | |
P = np.broadcast_to(p, M.shape).T # whole i-th row should be p_i | |
pi: np.float64 = (M * np.log(P)).sum() | |
pij: np.float64 = (M * np.log(P + P.T)).sum() | |
return pij - pi # likelihood is (pi - pij) and we need a loss function | |
def main() -> None: | |
p = fmin_cg(loss, np.ones(M.shape[0]), args=(M,)) | |
p /= p.sum() | |
print(p) # close to [0.12151104 0.15699947 0.11594851 0.31022851 0.29531247] | |
print(p[:, np.newaxis] / (p + p[:, np.newaxis])) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
An implementation of the ranking aggregation algorithm from the paper | |
Efficient Computation of Rankings from Pairwise Comparisons | |
<https://www.jmlr.org/papers/v24/22-1086.html>. | |
""" | |
__author__ = 'Dmitry Ustalov' | |
__copyright__ = 'Copyright 2023 Dmitry Ustalov' | |
__license__ = 'MIT' # https://opensource.org/licenses/MIT | |
import numpy as np | |
import numpy.typing as npt | |
# M_ij indicates the number of times item 'i' ranks higher than item 'j' | |
M: npt.NDArray[np.int64] = np.array([ | |
[0, 1, 2, 0, 1], | |
[2, 0, 2, 1, 0], | |
[1, 2, 0, 0, 1], | |
[1, 2, 1, 0, 2], | |
[2, 0, 1, 3, 0], | |
], dtype=np.int64) | |
# tie matrix | |
T = np.minimum(M, M.T) | |
# win matrix | |
W = M - T | |
EPS = 10e-6 | |
def main() -> None: | |
np.random.seed(0) | |
pi = np.random.rand(M.shape[0]) | |
v = np.random.rand() | |
converged, iterations = False, 0 | |
while not converged: | |
iterations += 1 | |
v_numerator = np.sum( | |
T * (pi[:, np.newaxis] + pi) / | |
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)) | |
) / 2 | |
v_denominator = np.sum( | |
W * 2 * np.sqrt(pi[:, np.newaxis] * pi) / | |
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)) | |
) | |
v = v_numerator / v_denominator | |
pi_old = pi.copy() | |
pi_numerator = np.sum( | |
(W + T / 2) * (pi + v * np.sqrt(pi[:, np.newaxis] * pi)) / | |
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)), | |
axis=1 | |
) | |
pi_denominator = np.sum( | |
(W + T / 2) * (1 + v * np.sqrt(pi[:, np.newaxis] * pi)) / | |
(pi[:, np.newaxis] + pi + 2 * v * np.sqrt(pi[:, np.newaxis] * pi)), | |
axis=0 | |
) | |
pi = pi_numerator / pi_denominator | |
converged = np.allclose(pi / (pi + 1), pi_old / (pi_old + 1), rtol=EPS, atol=EPS) | |
print(f'{iterations} iteration(s)') | |
print(pi) | |
print(pi[:, np.newaxis] / (pi + pi[:, np.newaxis])) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment