Last active
September 17, 2018 06:20
-
-
Save lorenzhs/864353c202112a38de17ed054f31e67c to your computer and use it in GitHub Desktop.
Twitter cryptoscam detection proof of concept. https://twitter.com/TinkerSec/status/961233575516389376
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# encoding: utf-8 | |
# author: Lorenz Hübschle-Schneider | |
# This is really really simple. Twitter, you have no excuse for not doing something like this! | |
import codecs | |
import json | |
import re | |
from unicodedata import normalize | |
eth_regex = re.compile("0x[a-fA-F0-9]{40}") | |
btc_regex = re.compile("[13][a-km-zA-HJ-NP-Z1-9]{25,34}") | |
# from: https://stackoverflow.com/a/32558749/3793885 | |
def levenshteinDistance(s1, s2): | |
if len(s1) > len(s2): | |
s1, s2 = s2, s1 | |
distances = range(len(s1) + 1) | |
for i2, c2 in enumerate(s2): | |
distances_ = [i2+1] | |
for i1, c1 in enumerate(s1): | |
if c1 == c2: | |
distances_.append(distances[i1]) | |
else: | |
distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1]))) | |
distances = distances_ | |
return distances[-1] | |
# return edit distance between ascii-normalized versions of the two input strings | |
def normalized_distance(original, query): | |
normalized_original = normalize('NFKD', original).encode('ascii', 'ignore') | |
normalized_query = normalize('NFKD', query).encode('ascii', 'ignore') | |
return levenshteinDistance(normalized_original, normalized_query) | |
# compute a score how likely a tweet is to be cryptocurrency scam | |
# value ranges from 0.0 (probably not) to 1.0 (pretty certain) | |
def classify_scam(tweet, original_tweet): | |
score = 0.0 | |
text = tweet["full_text"] | |
if eth_regex.search(text) or btc_regex.search(text): | |
score += 0.5 | |
displayname_distance = normalized_distance(original_tweet["user"]["name"], | |
tweet["user"]["name"]) | |
username_distance = normalized_distance(original_tweet["user"]["screen_name"], | |
tweet["user"]["screen_name"]) | |
score += 1.0/(displayname_distance + username_distance + 1) | |
return score | |
if __name__ == '__main__': | |
import sys | |
if len(sys.argv) == 1: | |
print('Usage: {} twarc-replies-dump.json'.format(sys.argv[0])) | |
print('') | |
print('This tool parses the output of "twarc replies <tweet-id>"') | |
print('See https://github.com/DocNow/twarc for more information on twarc') | |
sys.exit(1) | |
filename = sys.argv[1] | |
with codecs.open(filename, 'r', 'utf8') as inputfile: | |
lines = inputfile.readlines() | |
print('Read {} lines'.format(len(lines))) | |
original_tweet = json.loads(lines[0]) | |
suspects = [] | |
for line in lines[1:]: | |
tweet = json.loads(line) | |
score = classify_scam(tweet, original_tweet) | |
if score > 0.2: | |
suspects.append((score, tweet)) | |
for (score, tweet) in sorted(suspects, reverse = True): | |
print('Found a likely scammy tweet, score {}:'.format(score)) | |
print('\tfrom: {user} – {name}'.format( | |
user = tweet["user"]["screen_name"].encode('utf-8'), | |
name = tweet["user"]["name"].encode('utf-8'))) | |
print('\ttext: {}'.format(tweet["full_text"].encode('utf-8'))) | |
print('---------------------------------------------------------------') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Here's the output when applied to the first 753 replies to https://twitter.com/elonmusk/status/961083704230674438: