Last active
May 10, 2024 14:33
-
-
Save 9b/11379392 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from passivetotal import PassiveTotal | |
api = 'your key' | |
pt = PassiveTotal(api) | |
pt.setLogging('DEBUG') | |
print pt.search('www.trendmicro-update.org') | |
print pt.classify('www.trendmicro-update.org', 'targeted') | |
print pt.tag('www.trendmicro-update.org', 'china', 'add') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests, json, logging, sys | |
class PassiveTotal: | |
def __init__(self, apikey): | |
self.__apikey = apikey | |
self.__classifications = [ 'targeted', 'crime', 'benign', 'multiple' ] | |
self.__actions = [ 'add', 'remove' ] | |
self.__endpoint = 'https://www.passivetotal.org/api/' | |
self.__logger = logging.getLogger('PassiveTotal') | |
def setLogging(self, level): | |
logger = logging.getLogger('PassiveTotal') | |
if level == "INFO": | |
logger.setLevel(logging.INFO) | |
elif level == "WARN": | |
logger.setLevel(logging.WARN) | |
elif level == "DEBUG": | |
logger.setLevel(logging.DEBUG) | |
elif level == "ERROR": | |
logger.setLevel(logging.ERROR) | |
else: | |
pass | |
format = logging.Formatter('\033[1;32m%(levelname)-5s %(module)s:%(funcName)s():%(lineno)d %(asctime)s\033[0m| %(message)s') | |
shandler = logging.StreamHandler(sys.stdout) | |
shandler.setFormatter(format) | |
logger.addHandler(shandler) | |
return logger | |
def classify(self, value, classification): | |
url = self.__endpoint + 'classify' | |
if classification.lower() not in self.__classifications: | |
raise Exception("%s is not a valid classification type. Use %s." % ( classification, str(self.__classifications) ) ) | |
params = { 'apikey': self.__apikey, 'classification': classification.lower(), 'value': value } | |
response = requests.post(url, params=params) | |
self.__logger.debug("Response %d: %s %s" % (response.status_code, url, str(params))) | |
if response.status_code == 200: | |
return json.loads(response.content) | |
else: | |
self.__logger.error('Query failed: %s' % response.content) | |
raise Exception('Query failed') | |
def tag(self, value, tag, action): | |
if action.lower() not in self.__actions: | |
raise Exception("%s is not a valid tag action. Use %s." % ( action, str(self.__actions) ) ) | |
if action.lower() == 'add': | |
url = self.__endpoint + 'tag/add' | |
else: | |
url = self.__endpoint + 'tag/remove' | |
params = { 'apikey': self.__apikey, 'tag': tag, 'value': value } | |
response = requests.post(url, params=params) | |
self.__logger.debug("Response %d: %s %s" % (response.status_code, url, str(params))) | |
if response.status_code == 200: | |
return json.loads(response.content) | |
else: | |
self.__logger.error('Query failed: %s' % response.content) | |
raise Exception('Query failed') | |
def search(self, value): | |
url = self.__endpoint + 'passive' | |
params = { 'apikey': self.__apikey, 'value': value } | |
response = requests.post(url, params=params) | |
self.__logger.debug("Response %d: %s %s" % (response.status_code, url, str(params))) | |
if response.status_code == 200: | |
return json.loads(response.content) | |
else: | |
self.__logger.error('Query failed: %s' % response.content) | |
raise Exception('Query failed') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""PassiveTotal CLI. | |
Usage: | |
ptcli.py query <indicator> [--raw] | |
ptcli.py classify <indicator> (targeted|crime|multiple|benign) [--bulk] | |
ptcli.py (add|remove) tag <indicator> <tag> [--bulk] | |
ptcli.py (-h | --help) | |
ptcli.py --version | |
Options: | |
-h --help Show this screen. | |
--version Show version. | |
--raw Dump the results in raw JSON. | |
--bulk Read values from a file instead of the CLI. | |
""" | |
from docopt import docopt | |
from passivetotal import PassiveTotal | |
import os, sys | |
API = 'YOUR KEY' | |
if __name__ == '__main__': | |
arguments = docopt(__doc__, version='PassiveTotal 1.0') | |
pt = PassiveTotal(API) | |
if arguments['query']: | |
response = pt.search(arguments['<indicator>']) | |
if response['success']: | |
if arguments['--raw']: | |
print response | |
else: | |
response = response['results'] | |
print "[*] Query:", response['focus'] | |
print "[*] First Seen:", response['firstSeen'] | |
print "[*] Last Seen:", response['lastSeen'] | |
print "[*] Resolve Count: ", response['rawCount'] | |
print "[*] Resolutions" | |
for resolve in response['resolutions']: | |
print "=>", resolve['resolve'], resolve['firstSeen'], resolve['lastSeen'], [ str(x) for x in resolve['source'] ] | |
else: | |
print response['error'] | |
if arguments['classify']: | |
if not arguments['--bulk']: | |
if arguments['targeted']: | |
response = pt.classify(arguments['<indicator>'], 'targeted') | |
elif arguments['crime']: | |
response = pt.classify(arguments['<indicator>'], 'crime') | |
elif arguments['multiple']: | |
response = pt.classify(arguments['<indicator>'], 'multiple') | |
else: | |
response = pt.classify(arguments['<indicator>'], 'benign') | |
if response['success']: | |
print response['message'] | |
else: | |
print response['error'] | |
else: | |
if os.path.exists(arguments['<indicator>']): | |
f = open(arguments['<indicator>'], 'r') | |
items = [ x.strip() for x in f.readlines() ] | |
f.close() | |
for item in items: | |
if arguments['targeted']: | |
response = pt.classify(item, 'targeted') | |
elif arguments['crime']: | |
response = pt.classify(item, 'crime') | |
elif arguments['multiple']: | |
response = pt.classify(item, 'multiple') | |
else: | |
response = pt.classify(item, 'benign') | |
if response['success']: | |
print item, response['message'] | |
else: | |
print item, response['error'] | |
if arguments['tag']: | |
if not arguments['--bulk']: | |
if arguments['add']: | |
response = pt.tag(arguments['<indicator>'], arguments['<tag>'], 'add') | |
else: | |
response = pt.tag(arguments['<indicator>'], arguments['<tag>'], 'remove') | |
if response['success']: | |
print response['message'] | |
else: | |
print response['error'] | |
else: | |
if os.path.exists(arguments['<indicator>']): | |
f = open(arguments['<indicator>'], 'r') | |
items = [ x.strip() for x in f.readlines() ] | |
f.close() | |
for item in items: | |
if arguments['add']: | |
response = pt.tag(item, arguments['<tag>'], 'add') | |
else: | |
response = pt.tag(item, arguments['<tag>'], 'remove') | |
if response['success']: | |
print item, response['message'] | |
else: | |
print item, response['error'] |
+1
Also, would you consider submitting this to pypi so I can install with pip
and do proper version management?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If you put this in a proper repository with licensing information, I'd be eternally grateful. :)