Created
July 4, 2024 03:09
-
-
Save jgrizzled/d9d5b60ed5cbe5eafa8745632ec02de9 to your computer and use it in GitHub Desktop.
sybil attack simulation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
from typing import List | |
import matplotlib.pyplot as plt | |
class User: | |
def __init__(self, is_attacker=False, verified=False): | |
self.is_attacker = is_attacker | |
self.verified = verified | |
class Pair: | |
def __init__(self, user1: User, user2: User): | |
self.user1 = user1 | |
self.user2 = user2 | |
self.verified = False | |
self.disputed = False | |
def verify(self): | |
self.user1.verified = True | |
self.user2.verified = True | |
self.verified = True | |
def dispute(self, pairs: List['Pair'], courts: List['Court']): | |
if self.disputed or self.verified: | |
return | |
other_pairs = [p for p in pairs if p is not self] | |
pair1 = random.choice(other_pairs) | |
court1 = Court(self.user1, pair1) | |
courts.append(court1) | |
pair2 = random.choice(other_pairs) | |
court2 = Court(self.user2, pair2) | |
courts.append(court2) | |
self.disputed = True | |
class Court: | |
def __init__(self, user: User, pair: Pair): | |
self.user = user | |
self.pair = pair | |
self.verified = False | |
def verify(self): | |
if not self.pair.verified: | |
raise Exception("Pair not verified") | |
self.user.verified = True | |
self.verified = True | |
class System: | |
def __init__(self, starting_non_attackers, p_fv, non_attacker_growth_rate, fv_cost, invites_per_user): | |
if(starting_non_attackers % 2 == 0): | |
# enforce odd to make sure the lone attacker is not unpaired in the first period | |
self.starting_non_attackers = starting_non_attackers+1 | |
else: | |
self.starting_non_attackers = starting_non_attackers | |
self.p_fv = p_fv | |
self.non_attacker_growth_rate = non_attacker_growth_rate | |
self.invites_per_user = invites_per_user | |
self.fv_cost = fv_cost | |
self.users = [User(is_attacker=True, verified=True)] # Start with one attacker | |
self.users.extend([User(verified= True) for _ in range(starting_non_attackers - 1)]) # Add non-attacker users | |
self.attacker_cost = 0 | |
def run_period(self): | |
# Reset verification status | |
for user in self.users: | |
user.verified = False | |
# Pair users | |
pairs:List[Pair] = [] | |
random.shuffle(self.users) | |
for i in range(0, len(self.users), 2): | |
if i + 1 < len(self.users): | |
pair = Pair(self.users[i], self.users[i+1]) | |
pairs.append(pair) | |
# create new attacker accounts | |
attackers = sum(1 for user in self.users if user.is_attacker) | |
non_attackers = sum(1 for user in self.users if not user.is_attacker) | |
# attacker bribes/tricks for invites | |
attacker_invites_from_nonattackers = int(non_attackers * self.p_fv * self.invites_per_user) | |
self.attacker_cost += self.fv_cost*attacker_invites_from_nonattackers | |
attacker_invites_from_self = attackers * self.invites_per_user | |
attacker_invites = attacker_invites_from_nonattackers + attacker_invites_from_self | |
new_users = [User(is_attacker=True) for _ in range(attacker_invites - 1)] | |
# create new non-attacker accounts | |
non_attacker_invites = max(1, int(non_attackers * self.non_attacker_growth_rate)) | |
max_non_attacker_invites = non_attackers * self.invites_per_user | |
# limit new invites to remaining invites | |
remaining_invites = max_non_attacker_invites - attacker_invites_from_nonattackers | |
non_attacker_invites = min(non_attacker_invites, remaining_invites) | |
new_users.extend([User() for _ in range(non_attacker_invites - 1)]) | |
# Assign courts to random pairs | |
courts: List[Court] = [] | |
for u in new_users: | |
self.users.append(u) | |
pair = random.choice(pairs) | |
court = Court(u, pair) | |
courts.append(court) | |
# Handle non-attacker pairs | |
non_attacker_pairs = [p for p in pairs if not p.user1.is_attacker and not p.user2.is_attacker] | |
for p in non_attacker_pairs: | |
p.verify() | |
# Handle non-attacker courts | |
non_attacker_courts = [c for c in courts if c.pair in non_attacker_pairs] | |
for c in non_attacker_courts: | |
c.verify() | |
# Handle attacker pairs | |
attacker_pairs = [p for p in pairs if p.user1.is_attacker and p.user2.is_attacker] | |
for p in attacker_pairs: | |
p.verify() | |
# Handle attacker courts | |
attacker_courts = [c for c in courts if c.pair in attacker_pairs] | |
for c in attacker_courts: | |
c.verify() | |
# Handle attacker non-attacker pairs | |
attacker_non_pairs = [p for p in pairs if not p.verified and (p.user1.is_attacker or p.user2.is_attacker)] | |
for p in attacker_non_pairs: | |
self.attacker_cost += self.fv_cost # Cost for attempting verification with non-attacker | |
if random.random() < self.p_fv: | |
p.verify() | |
else: | |
# Failed, dispute and reassign | |
p.dispute(pairs, courts) | |
# Handle new non-attacker courts | |
non_attacker_courts = [c for c in courts if c.pair in non_attacker_pairs and not c.verified] | |
for c in non_attacker_courts: | |
c.verify() | |
# Handle new attacker courts | |
attacker_courts = [c for c in courts if c.pair in attacker_pairs and not c.verified] | |
for c in attacker_courts: | |
c.verify() | |
# Handle new attacker non-attacker courts | |
attacker_non_courts = [c for c in courts if c.pair in attacker_non_pairs and not c.verified] | |
for c in attacker_non_courts: | |
if c.pair.verified: | |
c.verify() | |
# remove unverified users | |
self.users = [u for u in self.users if u.verified] | |
def run_simulation(starting_non_attackers, p_fv, non_attacker_growth_rate, fv_cost, invites_per_user, periods): | |
system = System(starting_non_attackers, p_fv, non_attacker_growth_rate, fv_cost, invites_per_user) | |
ratios = [] | |
costs = [] | |
for _ in range(periods): | |
system.run_period() | |
total_users = sum(1 for user in system.users if user.verified) | |
attackers = sum(1 for user in system.users if user.is_attacker and user.verified) | |
ratio = attackers / total_users if total_users else 0 | |
ratios.append(ratio) | |
costs.append(system.attacker_cost) | |
return ratios, costs | |
# Simulation parameters | |
starting_non_attackers = 1000 | |
p_fv = 0.01 # Probability of successful bribe/trick | |
non_attacker_growth_rate = 0.05 # Growth rate of non-attacker accounts per existing non-attacker accounts | |
fv_cost = 10 # Attacker cost per bribe/trick attempt | |
invites_per_user = 2 # Per period | |
periods = 6 | |
# Run simulation | |
ratios, costs = run_simulation(starting_non_attackers, p_fv, non_attacker_growth_rate, fv_cost, invites_per_user, periods) | |
# Plot results | |
fig, ax1 = plt.subplots(figsize=(12, 6)) | |
ax1.set_xlabel('Periods') | |
ax1.set_ylabel('Ratio of Attacker-Controlled Verified Accounts', color='tab:blue') | |
ax1.plot(range(1, periods + 1), ratios, color='tab:blue') | |
ax1.set_ylim(0, 1) | |
ax1.tick_params(axis='y', labelcolor='tab:blue') | |
ax1.axhline(y=1/2, color='r', linestyle='--', label='1/2 Threshold') | |
ax1.set_xticks(range(1, periods + 1)) | |
ax1.set_xticklabels(range(1, periods + 1)) | |
ax2 = ax1.twinx() # instantiate a second axes that shares the same x-axis | |
ax2.set_ylabel('Cumulative Attacker Costs', color='tab:orange') | |
ax2.plot(range(1, periods + 1), costs, color='tab:orange') | |
ax2.tick_params(axis='y', labelcolor='tab:orange') | |
plt.title('Sybil Attack Simulation with Costs') | |
fig.tight_layout() # otherwise the right y-label is slightly clipped | |
plt.grid(True) | |
plt.show() | |
print(f"Final attacker ratio: {ratios[-1]:.2f}") | |
periods_to_majority = next((i for i, r in enumerate(ratios) if r > 1/2), 0) | |
print(f"Periods to reach majority: {periods_to_majority if periods_to_majority else 'Not reached'}") | |
cost_to_majority = costs[periods_to_majority-1] if periods_to_majority else 0 | |
print(f"Cost to reach majority: {cost_to_majority:.2f}") | |
print(f"Total attacker cost: {costs[-1]:.2f}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment