Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save reanimat0r/3ca1d705cec4bb01e5d4753ee5d9393c to your computer and use it in GitHub Desktop.
Save reanimat0r/3ca1d705cec4bb01e5d4753ee5d9393c to your computer and use it in GitHub Desktop.
'''
Requirements:
sudo apt-get install python3
sudo apt-get install pip3
pip3 install python-twitter
pip3 install beautifulsoup4
'''
import os
import re
import sys
import bs4
import json
import twitter
import requests
import argparse
class AppConfig():
def __init__(self):
self.pulsedive_api_key = ""
self.twitter_access_token = ""
self.twitter_access_token_secret = ""
self.twitter_consumer_key = ""
self.twitter_consumer_secret = ""
self.whitelist = ""
@staticmethod
def load_config(config_file):
app_config = AppConfig()
config = None
if config_file == "config.json":
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
if os.path.isfile(config_file):
config = app_config._read_from_disk(config_file)
if not app_config._load_config_json(config):
print "\r\n[!] An error occurred while attempting to load the configuration from: {0}".format(config_file)
return app_config
def _read_from_disk(self, filename):
config_json = None
if os.path.isfile(filename):
with open(filename, "r") as input_config_file:
config_json = json.loads(input_config_file.read())
else:
print "\r\n[!] Config file not found: {0}".format(filename)
return config_json
def _load_config_json(self, config_json):
required_keys = [
"pulsedive_api_key",
"twitter_access_token",
"twitter_access_token_secret",
"twitter_consumer_key",
"twitter_consumer_secret",
"whitelist"
]
for key_name in required_keys:
if key_name not in config_json:
print "\r\n[!] Invalid config file, missing attribute: {0}".format(key_name)
return False
self.pulsedive_api_key = config_json["pulsedive_api_key"]
self.twitter_access_token = config_json["twitter_access_token"]
self.twitter_access_token_secret = config_json["twitter_access_token_secret"]
self.twitter_consumer_key = config_json["twitter_consumer_key"]
self.twitter_consumer_secret = config_json["twitter_consumer_secret"]
self.whitelist = config_json["whitelist"]
return True
class TweetCollector:
def __init__(self, config):
self.twitter_api = twitter.Api(
config.twitter_consumer_key,
config.twitter_consumer_secret,
config.twitter_access_token,
config.twitter_access_token_secret,
tweet_mode="extended"
)
def search(self, search_string):
search_results = []
if search_string.startswith("q="):
try:
results = self.twitter_api.GetSearch(raw_query="{0}".format(search_string))
except (requests.exceptions.SSLError) as e:
print "\r\n[!] An error occurred while making the Twitter GetSearch api call with search string: {0}".format(search_string)
print "\r\n[!] Exception Information:\r\n{0}\r\n".format(e)
quit()
elif search_string is not None:
try:
results = self.twitter_api.GetSearch(raw_query="q={0}".format(search_string))
except (requests.exceptions.SSLError) as e:
print "\r\n[!] An error occurred while making the Twitter GetSearch api call with search string: {0}".format(search_string)
print "\r\n[!] Exception Information:\r\n{0}\r\n".format(e)
quit()
for result in results:
search_results.append(json.loads(result.AsJsonString()))
return search_results
class TweetFilter:
def __init__(self):
self.regex = RegexMatch()
def contains_urls(self, search_results):
tweets_with_urls = []
for result in search_results:
for url in result["urls"]:
if url["expanded_url"] is not None and self.regex.url(url["expanded_url"]):
tweets_with_urls.append(result)
return tweets_with_urls
def contains_pastebin_urls(self, search_results):
tweets_with_pastebin_urls = []
for result in search_results:
for url in result["urls"]:
if url["expanded_url"] is not None and "pastebin.com/" in url["expanded_url"] and self.regex.url(url["expanded_url"]):
tweets_with_pastebin_urls.append(result)
return tweets_with_pastebin_urls
def get_original_tweet(self, search_results):
original_tweets = []
for result in search_results:
if "retweet_count" not in result.keys():
original_tweets.append(result["retweeted_status"])
elif "retweet_count" in result.keys() and result["retweet_count"] is not 0:
original_tweets.append(result)
class ThreatCollector:
def __init__(self, config):
self.regex = RegexMatch()
self.pulsedive_api_key = config.pulsedive_api_key
# TODO Add support for file extensions and hashes
def get_pastebin_iocs(self, pastebin_urls, recursive_search=False):
ioc_list = []
for url in pastebin_urls:
if url is not None and "pastebin.com/" in url and self.regex.url(url):
if "pastebin.com/raw" in url:
url = url.replace("/raw", "")
try:
parsed_lines = bs4.BeautifulSoup(requests.get(url).text, "html.parser").find("textarea").string.splitlines()
except (requests.exceptions.SSLError) as e:
print "\r\n[!] An error occurred while requesting the URL: {0}".format(url)
print "\r\n[!] Exception Information:\r\n{0}\r\n".format(e)
quit()
for line in parsed_lines:
line_split = line.split()
for split in line_split:
if recursive_search is True and "pastebin.com/" in split.lower() and self.regex.url(split) and url != split:
ioc_list.extend(self.get_pastebin_iocs(split))
elif recursive_search is True and "pastebin.com/" in split.lower() and url != split:
ioc_list.extend([self.get_pastebin_iocs(x) for x in split.split(" ") if self.regex.url(x)])
elif self.regex.ipv4(split) or self.regex.url(split) and url != split:
ioc_list.append(split)
return ioc_list
def get_pulsedive_iocs(self, threat):
iocs = []
if threat is not None:
url = self._prepare_pulsedive_url(threat)
if url is not None:
response = requests.get(url).text
response_json = json.loads(response)
iocs = self._handle_pulsedive_response(response_json)
return iocs
def _prepare_pulsedive_url(self, threat):
prepared_url = None
query = None
if isinstance(threat, int):
query = "&get=links&tid={0}".format(threat)
elif isinstance(threat, str):
query = "&tname={0}".format(threat)
if query is not None:
prepared_url = "https://pulsedive.com/api/info.php?{0}&key={1}".format(query, self.pulsedive_api_key)
return prepared_url
def _handle_pulsedive_response(self, pulsedive_response):
handled_response = None
if pulsedive_response and "tid" in pulsedive_response.keys():
tid = pulsedive_response["tid"]
handled_response = self.get_pulsedive_iocs(tid)
elif pulsedive_response and "results" in pulsedive_response.keys() and len(pulsedive_response["results"]) is not 0:
handled_response = pulsedive_response["results"]
return handled_response
class ThreatFilter:
def __init__(self, config):
if len(config.whitelist) is 0:
self.whitelist = []
else:
self.whitelist = config.whitelist
return None
# TODO - add functions to manage whitelist
def filter_whitelisted_iocs(self, ioc_list):
filtered_iocs = []
for indicator in ioc_list:
whitelisted_ioc = False
for known_good in self.whitelist:
if known_good in indicator:
whitelisted_ioc = True
if whitelisted_ioc is True:
break
else:
filtered_iocs.append(indicator)
filtered_iocs = set(filtered_iocs)
return filtered_iocs
def filter_pulsedive_iocs_by_type(self, pulsedive_indicators, filter_types):
filtered_iocs = []
for indicator in pulsedive_indicators:
for filter_type in filter_types:
if "type" in indicator.keys() and indicator["type"] == filter_type:
filtered_iocs.append(indicator)
return filtered_iocs
def filter_pulsedive_iocs_by_risk(self, pulsedive_indicators, filter_risks):
filtered_iocs = []
for indicator in pulsedive_indicators:
for filter_risk in filter_risks:
if "risk" in indicator.keys() and indicator["risk"] == filter_risk:
filtered_iocs.append(indicator)
return filtered_iocs
class RegexMatch:
# TODO Add support for hashes and files with extensions
def __init__(self):
self.url_regex = r"\b((?:https?://)?(?:(?:www\.)?(?:[\da-z\.-]+)\.(?:[a-z]{2,6})|(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|(?:(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])))(?::[0-9]{1,4}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])?(?:/[\w\.-]*)*/?)\b"
self.ipv4_regex = r"\b(?:(?:1\d\d|2[0-5][0-5]|2[0-4]\d|0?[1-9]\d|0?0?\d)\.){3}(?:1\d\d|2[0-5][0-5]|2[0-4]\d|0?[1-9]\d|0?0?\d)\b"
def url(self, object_to_filter):
if re.match(self.url_regex, object_to_filter):
return True
else:
return False
def ipv4(self, object_to_filter):
if re.match(self.ipv4_regex, object_to_filter):
return True
else:
return False
class OutputHandler:
def __init__(self):
return None
# TODO - ADD LOGIC TO WRITE TO FILE
def to_file(self, output_file, output_format, threat_name):
if output_file == "output":
output_file = output_file + ".{0}".format(output_format)
output_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), output_file)
else:
output_file = output_file + output_format
return None
def to_stdout(self, object_to_output):
sys.stdout.write(object_to_output)
return None
if __name__ == "__main__":
argument_parser = argparse.ArgumentParser(description="Twitter Threat Collection Bot")
argument_parser.add_argument("-c", "--config_file", dest="config", default="config.json", help="Path to configuration file.")
argument_parser.add_argument("-o", "--output_file", dest="output_file", default="output", help="Name of file to output results to.")
argument_parser.add_argument("-f", "--output_format", dest="output_format", default="json", help="Format in which to write the output file.")
arguments = argument_parser.parse_args()
# Attempting to load the application configuration
config = AppConfig.load_config(arguments.config)
if config is None:
quit()
# Getting Twitter search results for '#emotet filter:links pastebin'
tweet_collector = TweetCollector(config)
twitter_search_results = tweet_collector.search("%23emotet%20filter%3Alinks%20pastebin")
# Filtering Tweets for pastebin URLs
tweet_filter = TweetFilter()
pastebin_tweets = tweet_filter.contains_pastebin_urls(twitter_search_results)
unique_pastebin_tweets = set(pastebin_tweets)
# Scraping the returned pastebin sites for Emotet Indicators of Compromise (IOCs)
threat_collector = ThreatCollector(config)
emotet_twitter_iocs = threat_collector.get_pastebin_iocs(unique_pastebin_tweets, True)
emotet_pulsedive_iocs = threat_collector.get_pulsedive_iocs("emotet")
# Filtering IOCs
aggregated_iocs = []
threat_filter = ThreatFilter(config)
filtered_emotet_pulsedive_iocs = threat_filter.filter_pulsedive_iocs_by_type(emotet_pulsedive_iocs, ['url', 'ip'])
filtered_emotet_pulsedive_iocs = threat_filter.filter_pulsedive_iocs_by_risk(filtered_emotet_pulsedive_iocs, ['high', 'critical'])
aggregated_iocs.extend(set(emotet_twitter_iocs))
aggregated_iocs.extend(set(filtered_emotet_pulsedive_iocs))
filtered_aggregated_iocs = threat_filter.filter_whitelisted_iocs(aggregated_iocs)
output_handler = OutputHandler()
for ioc in filtered_aggregated_iocs:
output_handler.to_stdout(ioc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment