Forked from taco-shellcode/osint-collector-python27.py
Created
October 25, 2019 20:55
-
-
Save reanimat0r/3ca1d705cec4bb01e5d4753ee5d9393c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Requirements: | |
sudo apt-get install python3 | |
sudo apt-get install pip3 | |
pip3 install python-twitter | |
pip3 install beautifulsoup4 | |
''' | |
import os | |
import re | |
import sys | |
import bs4 | |
import json | |
import twitter | |
import requests | |
import argparse | |
class AppConfig(): | |
def __init__(self): | |
self.pulsedive_api_key = "" | |
self.twitter_access_token = "" | |
self.twitter_access_token_secret = "" | |
self.twitter_consumer_key = "" | |
self.twitter_consumer_secret = "" | |
self.whitelist = "" | |
@staticmethod | |
def load_config(config_file): | |
app_config = AppConfig() | |
config = None | |
if config_file == "config.json": | |
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json") | |
if os.path.isfile(config_file): | |
config = app_config._read_from_disk(config_file) | |
if not app_config._load_config_json(config): | |
print "\r\n[!] An error occurred while attempting to load the configuration from: {0}".format(config_file) | |
return app_config | |
def _read_from_disk(self, filename): | |
config_json = None | |
if os.path.isfile(filename): | |
with open(filename, "r") as input_config_file: | |
config_json = json.loads(input_config_file.read()) | |
else: | |
print "\r\n[!] Config file not found: {0}".format(filename) | |
return config_json | |
def _load_config_json(self, config_json): | |
required_keys = [ | |
"pulsedive_api_key", | |
"twitter_access_token", | |
"twitter_access_token_secret", | |
"twitter_consumer_key", | |
"twitter_consumer_secret", | |
"whitelist" | |
] | |
for key_name in required_keys: | |
if key_name not in config_json: | |
print "\r\n[!] Invalid config file, missing attribute: {0}".format(key_name) | |
return False | |
self.pulsedive_api_key = config_json["pulsedive_api_key"] | |
self.twitter_access_token = config_json["twitter_access_token"] | |
self.twitter_access_token_secret = config_json["twitter_access_token_secret"] | |
self.twitter_consumer_key = config_json["twitter_consumer_key"] | |
self.twitter_consumer_secret = config_json["twitter_consumer_secret"] | |
self.whitelist = config_json["whitelist"] | |
return True | |
class TweetCollector: | |
def __init__(self, config): | |
self.twitter_api = twitter.Api( | |
config.twitter_consumer_key, | |
config.twitter_consumer_secret, | |
config.twitter_access_token, | |
config.twitter_access_token_secret, | |
tweet_mode="extended" | |
) | |
def search(self, search_string): | |
search_results = [] | |
if search_string.startswith("q="): | |
try: | |
results = self.twitter_api.GetSearch(raw_query="{0}".format(search_string)) | |
except (requests.exceptions.SSLError) as e: | |
print "\r\n[!] An error occurred while making the Twitter GetSearch api call with search string: {0}".format(search_string) | |
print "\r\n[!] Exception Information:\r\n{0}\r\n".format(e) | |
quit() | |
elif search_string is not None: | |
try: | |
results = self.twitter_api.GetSearch(raw_query="q={0}".format(search_string)) | |
except (requests.exceptions.SSLError) as e: | |
print "\r\n[!] An error occurred while making the Twitter GetSearch api call with search string: {0}".format(search_string) | |
print "\r\n[!] Exception Information:\r\n{0}\r\n".format(e) | |
quit() | |
for result in results: | |
search_results.append(json.loads(result.AsJsonString())) | |
return search_results | |
class TweetFilter: | |
def __init__(self): | |
self.regex = RegexMatch() | |
def contains_urls(self, search_results): | |
tweets_with_urls = [] | |
for result in search_results: | |
for url in result["urls"]: | |
if url["expanded_url"] is not None and self.regex.url(url["expanded_url"]): | |
tweets_with_urls.append(result) | |
return tweets_with_urls | |
def contains_pastebin_urls(self, search_results): | |
tweets_with_pastebin_urls = [] | |
for result in search_results: | |
for url in result["urls"]: | |
if url["expanded_url"] is not None and "pastebin.com/" in url["expanded_url"] and self.regex.url(url["expanded_url"]): | |
tweets_with_pastebin_urls.append(result) | |
return tweets_with_pastebin_urls | |
def get_original_tweet(self, search_results): | |
original_tweets = [] | |
for result in search_results: | |
if "retweet_count" not in result.keys(): | |
original_tweets.append(result["retweeted_status"]) | |
elif "retweet_count" in result.keys() and result["retweet_count"] is not 0: | |
original_tweets.append(result) | |
class ThreatCollector: | |
def __init__(self, config): | |
self.regex = RegexMatch() | |
self.pulsedive_api_key = config.pulsedive_api_key | |
# TODO Add support for file extensions and hashes | |
def get_pastebin_iocs(self, pastebin_urls, recursive_search=False): | |
ioc_list = [] | |
for url in pastebin_urls: | |
if url is not None and "pastebin.com/" in url and self.regex.url(url): | |
if "pastebin.com/raw" in url: | |
url = url.replace("/raw", "") | |
try: | |
parsed_lines = bs4.BeautifulSoup(requests.get(url).text, "html.parser").find("textarea").string.splitlines() | |
except (requests.exceptions.SSLError) as e: | |
print "\r\n[!] An error occurred while requesting the URL: {0}".format(url) | |
print "\r\n[!] Exception Information:\r\n{0}\r\n".format(e) | |
quit() | |
for line in parsed_lines: | |
line_split = line.split() | |
for split in line_split: | |
if recursive_search is True and "pastebin.com/" in split.lower() and self.regex.url(split) and url != split: | |
ioc_list.extend(self.get_pastebin_iocs(split)) | |
elif recursive_search is True and "pastebin.com/" in split.lower() and url != split: | |
ioc_list.extend([self.get_pastebin_iocs(x) for x in split.split(" ") if self.regex.url(x)]) | |
elif self.regex.ipv4(split) or self.regex.url(split) and url != split: | |
ioc_list.append(split) | |
return ioc_list | |
def get_pulsedive_iocs(self, threat): | |
iocs = [] | |
if threat is not None: | |
url = self._prepare_pulsedive_url(threat) | |
if url is not None: | |
response = requests.get(url).text | |
response_json = json.loads(response) | |
iocs = self._handle_pulsedive_response(response_json) | |
return iocs | |
def _prepare_pulsedive_url(self, threat): | |
prepared_url = None | |
query = None | |
if isinstance(threat, int): | |
query = "&get=links&tid={0}".format(threat) | |
elif isinstance(threat, str): | |
query = "&tname={0}".format(threat) | |
if query is not None: | |
prepared_url = "https://pulsedive.com/api/info.php?{0}&key={1}".format(query, self.pulsedive_api_key) | |
return prepared_url | |
def _handle_pulsedive_response(self, pulsedive_response): | |
handled_response = None | |
if pulsedive_response and "tid" in pulsedive_response.keys(): | |
tid = pulsedive_response["tid"] | |
handled_response = self.get_pulsedive_iocs(tid) | |
elif pulsedive_response and "results" in pulsedive_response.keys() and len(pulsedive_response["results"]) is not 0: | |
handled_response = pulsedive_response["results"] | |
return handled_response | |
class ThreatFilter: | |
def __init__(self, config): | |
if len(config.whitelist) is 0: | |
self.whitelist = [] | |
else: | |
self.whitelist = config.whitelist | |
return None | |
# TODO - add functions to manage whitelist | |
def filter_whitelisted_iocs(self, ioc_list): | |
filtered_iocs = [] | |
for indicator in ioc_list: | |
whitelisted_ioc = False | |
for known_good in self.whitelist: | |
if known_good in indicator: | |
whitelisted_ioc = True | |
if whitelisted_ioc is True: | |
break | |
else: | |
filtered_iocs.append(indicator) | |
filtered_iocs = set(filtered_iocs) | |
return filtered_iocs | |
def filter_pulsedive_iocs_by_type(self, pulsedive_indicators, filter_types): | |
filtered_iocs = [] | |
for indicator in pulsedive_indicators: | |
for filter_type in filter_types: | |
if "type" in indicator.keys() and indicator["type"] == filter_type: | |
filtered_iocs.append(indicator) | |
return filtered_iocs | |
def filter_pulsedive_iocs_by_risk(self, pulsedive_indicators, filter_risks): | |
filtered_iocs = [] | |
for indicator in pulsedive_indicators: | |
for filter_risk in filter_risks: | |
if "risk" in indicator.keys() and indicator["risk"] == filter_risk: | |
filtered_iocs.append(indicator) | |
return filtered_iocs | |
class RegexMatch: | |
# TODO Add support for hashes and files with extensions | |
def __init__(self): | |
self.url_regex = r"\b((?:https?://)?(?:(?:www\.)?(?:[\da-z\.-]+)\.(?:[a-z]{2,6})|(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|(?:(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])))(?::[0-9]{1,4}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])?(?:/[\w\.-]*)*/?)\b" | |
self.ipv4_regex = r"\b(?:(?:1\d\d|2[0-5][0-5]|2[0-4]\d|0?[1-9]\d|0?0?\d)\.){3}(?:1\d\d|2[0-5][0-5]|2[0-4]\d|0?[1-9]\d|0?0?\d)\b" | |
def url(self, object_to_filter): | |
if re.match(self.url_regex, object_to_filter): | |
return True | |
else: | |
return False | |
def ipv4(self, object_to_filter): | |
if re.match(self.ipv4_regex, object_to_filter): | |
return True | |
else: | |
return False | |
class OutputHandler: | |
def __init__(self): | |
return None | |
# TODO - ADD LOGIC TO WRITE TO FILE | |
def to_file(self, output_file, output_format, threat_name): | |
if output_file == "output": | |
output_file = output_file + ".{0}".format(output_format) | |
output_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), output_file) | |
else: | |
output_file = output_file + output_format | |
return None | |
def to_stdout(self, object_to_output): | |
sys.stdout.write(object_to_output) | |
return None | |
if __name__ == "__main__": | |
argument_parser = argparse.ArgumentParser(description="Twitter Threat Collection Bot") | |
argument_parser.add_argument("-c", "--config_file", dest="config", default="config.json", help="Path to configuration file.") | |
argument_parser.add_argument("-o", "--output_file", dest="output_file", default="output", help="Name of file to output results to.") | |
argument_parser.add_argument("-f", "--output_format", dest="output_format", default="json", help="Format in which to write the output file.") | |
arguments = argument_parser.parse_args() | |
# Attempting to load the application configuration | |
config = AppConfig.load_config(arguments.config) | |
if config is None: | |
quit() | |
# Getting Twitter search results for '#emotet filter:links pastebin' | |
tweet_collector = TweetCollector(config) | |
twitter_search_results = tweet_collector.search("%23emotet%20filter%3Alinks%20pastebin") | |
# Filtering Tweets for pastebin URLs | |
tweet_filter = TweetFilter() | |
pastebin_tweets = tweet_filter.contains_pastebin_urls(twitter_search_results) | |
unique_pastebin_tweets = set(pastebin_tweets) | |
# Scraping the returned pastebin sites for Emotet Indicators of Compromise (IOCs) | |
threat_collector = ThreatCollector(config) | |
emotet_twitter_iocs = threat_collector.get_pastebin_iocs(unique_pastebin_tweets, True) | |
emotet_pulsedive_iocs = threat_collector.get_pulsedive_iocs("emotet") | |
# Filtering IOCs | |
aggregated_iocs = [] | |
threat_filter = ThreatFilter(config) | |
filtered_emotet_pulsedive_iocs = threat_filter.filter_pulsedive_iocs_by_type(emotet_pulsedive_iocs, ['url', 'ip']) | |
filtered_emotet_pulsedive_iocs = threat_filter.filter_pulsedive_iocs_by_risk(filtered_emotet_pulsedive_iocs, ['high', 'critical']) | |
aggregated_iocs.extend(set(emotet_twitter_iocs)) | |
aggregated_iocs.extend(set(filtered_emotet_pulsedive_iocs)) | |
filtered_aggregated_iocs = threat_filter.filter_whitelisted_iocs(aggregated_iocs) | |
output_handler = OutputHandler() | |
for ioc in filtered_aggregated_iocs: | |
output_handler.to_stdout(ioc) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment