Created
April 16, 2019 17:44
-
-
Save mpurzynski/1b835b8e92bc3ce6662c42d43c0b8216 to your computer and use it in GitHub Desktop.
otx-misp.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
otx_api_key: {{ otx_api_key }} | |
misp_api_key: {{ misp_api_key }} | |
misp_api_url: {{ misp_api_url }} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import logging | |
from yaml import Loader, load, dump | |
from sys import argv, stderr | |
from os import environ, fsync, stat, rename | |
from logging.handlers import SysLogHandler | |
from OTXv2 import OTXv2, IndicatorTypes | |
from pandas.io.json import json_normalize | |
from datetime import datetime, timedelta | |
from dateutil import parser as date_parser | |
from pymisp import ExpandedPyMISP, MISPEvent | |
def setup_logging(stream=stderr, level=logging.INFO): | |
formatstr = ( | |
"[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s" | |
) | |
logging.basicConfig(format=formatstr, datefmt="%H:%M:%S", stream=stream) | |
logger = logging.getLogger(__name__) | |
logger.setLevel(level) | |
return logger | |
def main(): | |
global logger | |
environ["TZ"] = "UTC" # Override timezone so we know where we're at | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-c", "--config", help="Specify a configuration file") | |
parser.add_argument("-d", "--debug", help="Print debug messages") | |
args = parser.parse_args() | |
with open(argv[0].replace(".py", ".yml"), "r") as configyaml: | |
config = load(configyaml, Loader=Loader) | |
otx_api_key = config.get("otx_api_key", "<OTXAPIKEY>") | |
misp_api_key = config.get("misp_api_key", "<MISPAPIKEY>") | |
misp_api_url = config.get("misp_api_url", "<APIKEY>") | |
if args.debug: | |
logger = setup_logging(level=logging.DEBUG) | |
else: | |
logger = setup_logging(level=logging.INFO) | |
logger.level = logging.DEBUG | |
logger.debug("Started and initialized") | |
pulses = [] | |
otx = OTXv2(otx_api_key) | |
pulses = otx.getall(modified_since=datetime.today() - timedelta(days=3)) | |
print(len(pulses)) | |
misp = ExpandedPyMISP(misp_api_url, misp_api_key, True) | |
for pulse in pulses: | |
event = MISPEvent() | |
event.distribution = 0 | |
event.threat_level_id = 1 | |
event.analysis = 2 | |
if "name" in pulse: | |
event.info = pulse["name"] | |
if "author_name" in pulse: | |
event.info = pulse["author_name"] + " | " + pulse["name"] | |
try: | |
dt = date_parser.parse(pulse["created"]) | |
except (ValueError, OverflowError): | |
logger.error("Cannot parse Pulse 'created' date") | |
dt = datetime.utcnow() | |
event["date"] = dt | |
event_obj = misp.add_event(event) | |
event_id = event_obj.id | |
print("Event id: %s" % event_id) | |
for indicator in pulse["indicators"]: | |
indicator_kwargs = {"to_ids": True} | |
indicator_kwargs["comment"] = indicator["description"] | |
if indicator["type"] == "FileHash-SHA256": | |
misp.add_hashes( | |
event_id, sha256=indicator["indicator"], **indicator_kwargs | |
) | |
if indicator["type"] == "FileHash-SHA1": | |
misp.add_hashes( | |
event_id, sha1=indicator["indicator"], **indicator_kwargs | |
) | |
if indicator["type"] == "FileHash-MD5": | |
misp.add_hashes( | |
event_id, md5=indicator["indicator"], **indicator_kwargs | |
) | |
if "description" in indicator: | |
indicator_description = indicator["description"] | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment