Created
July 15, 2013 14:49
-
-
Save TkTech/6000549 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""clean_db.py | |
Helps to immunize registration spam from a MediaWiki installation. | |
Usage: | |
clean_db.py scan <dbstring> [options] | |
clean_db.py bans <dbstring> | |
Options: | |
-h, --help Show this message. | |
--multiple-accounts Only consider IPs that have signed up | |
for multiple accounts. | |
--days-ago=<days> Only consider IPs that are at least | |
<days> old. | |
--ban Bans matching IPs. | |
""" | |
import sys | |
import datetime | |
from collections import defaultdict | |
from docopt import docopt | |
from sqlalchemy import * | |
from sqlalchemy.orm import sessionmaker | |
from sqlalchemy.ext.declarative import ( | |
declarative_base, | |
DeferredReflection | |
) | |
from sqlalchemy.orm.exc import MultipleResultsFound | |
to_mw_ts = lambda dt: dt.strftime('%Y%m%d%H%M%S') | |
from_mw_ts = lambda ts: datetime.datetime.strptime(ts, '%Y%m%d%H%M%S') | |
def ip_to_hex(ip): | |
return ''.join( | |
hex(int(x))[2:].zfill(2) | |
for x in ip.split('.') | |
) | |
Base = declarative_base(cls=DeferredReflection) | |
class User(Base): | |
__tablename__ = 'wiki_user' | |
user_id = Column('user_id', Integer, primary_key=True) | |
def user_groups(self, session): | |
return session.query(UserGroups.ug_group).filter( | |
UserGroups.ug_user == self.user_id | |
).all() | |
@property | |
def registered(self): | |
return from_mw_ts(self.user_registration) | |
def ip(self, session): | |
try: | |
return session.query(RecentChanges.rc_ip).filter( | |
RecentChanges.rc_user == self.user_id | |
).scalar() | |
except MultipleResultsFound: | |
return None | |
return None | |
class RecentChanges(Base): | |
__tablename__ = 'wiki_recentchanges' | |
class UserGroups(Base): | |
__tablename__ = 'wiki_user_groups' | |
ug_group = Column('ug_group', VARBINARY, primary_key=True) | |
ug_user = Column( | |
'ug_user', | |
Integer, | |
ForeignKey('wiki_user.user_id'), | |
primary_key=True | |
) | |
class IPBlocks(Base): | |
__tablename__ = 'wiki_ipblocks' | |
def _do_scan(args, session): | |
# Get all of the user's that haven't previously made an | |
# edit. | |
user_q = session.query(User).filter( | |
User.user_editcount == 0 | |
) | |
ips = defaultdict(lambda: dict(count=0, users=list())) | |
for user in user_q: | |
user_groups = user.user_groups(session) | |
# Make sure this isn't a confirmed user. | |
if user_groups: | |
if any(g.ug_group == 'confirmed' for g in user_groups): | |
continue | |
# Make sure the account is at least X days old. | |
if args['--days-ago']: | |
days_ago = int(args['--days-ago']) | |
delta = datetime.datetime.now() - user.registered | |
if delta.days < days_ago: | |
continue | |
ip = user.ip(session) | |
if not ip: | |
continue | |
ips[ip]['users'].append(user.user_name) | |
ips[ip]['count'] += 1 | |
if args['--multiple-accounts']: | |
final_ips = dict( | |
(k, v) | |
for k, v in ips.items() | |
if v['count'] > 1 | |
) | |
else: | |
final_ips = dict(ips) | |
if args['--ban']: | |
for ip, data in final_ips.items(): | |
existing_bans = session.query(IPBlocks).filter( | |
IPBlocks.ipb_address == ip | |
).all() | |
if existing_bans: | |
# This IP is already banned, we can forget about | |
# it. | |
print('Skipping {ip}.'.format(ip=ip)) | |
continue | |
session.add(IPBlocks( | |
ipb_address=ip, | |
ipb_user=0, | |
ipb_by=1, | |
ipb_by_text='TkTech', | |
ipb_reason='Spam', | |
ipb_timestamp=to_mw_ts(datetime.datetime.now()), | |
ipb_auto=0, | |
ipb_anon_only=0, | |
ipb_create_account=1, | |
ipb_enable_autoblock=0, | |
ipb_expiry='infinity', | |
ipb_range_start=ip_to_hex(ip), | |
ipb_range_end=ip_to_hex(ip), | |
ipb_deleted=0, | |
ipb_block_email=0, | |
ipb_allow_usertalk=0, | |
ipb_parent_block_id=None | |
)) | |
session.commit() | |
print('Banned {ip}'.format(ip=ip)) | |
def _do_bans(args, session): | |
print(set( | |
ban[0] for ban in session.query(IPBlocks.ipb_address).all() | |
)) | |
def main(argv): | |
args = docopt(__doc__, argv=sys.argv[1:]) | |
engine = create_engine(args['<dbstring>']) | |
Base.prepare(engine) | |
Session = sessionmaker(bind=engine, autocommit=False) | |
session = Session() | |
if args['scan']: | |
_do_scan(args, session) | |
elif args['bans']: | |
_do_bans(args, session) | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment