Created
February 21, 2021 17:02
-
-
Save JusticeRage/f6fd2d003c13d85c4f864fd7f327382d to your computer and use it in GitHub Desktop.
API for manalyzer.org
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This program is free software: you can redistribute it and/or modify | |
it under the terms of the GNU General Public License as published by | |
the Free Software Foundation, either version 3 of the License, or | |
(at your option) any later version. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License | |
along with this program. If not, see <https://www.gnu.org/licenses/>. | |
""" | |
# The full documentation for Manalyzer.org's API can be found at: | |
# https://docs.manalyzer.org/en/latest/interfacing.html | |
import json | |
import requests | |
import time | |
ENDPOINT = "https://manalyzer.org" | |
def get_task_status(task_id): | |
""" | |
Jobs submitted to the endpoint go inside a queue. Their status can be either queued, started, finished or failed. | |
:param task_id: The ID of the task to query. It is also the MD5 of the file analyzed. | |
:return: A JSON object which contains a "status" field, and optional data. | |
""" | |
r = requests.get(f"{ENDPOINT}/task/{task_id}") | |
return json.loads(r.text) | |
def get_report(md5): | |
""" | |
Obtains the results of an analysis for a given sample. | |
:param md5: The MD5 of the sample to query. | |
:return: The JSON output of Manalyze for that sample, or None if it wasn't analyzed on the website. | |
""" | |
r = requests.get(f"{ENDPOINT}/json/{md5}") | |
if r.status_code == 200: | |
return json.loads(r.text) | |
else: | |
return None | |
def submit_sample(path, blocking=True): | |
""" | |
Submits a sample to the endpoint for analysis. This function can be either blocking or non-blocking. It the first | |
case, it will wait until the endpoint has finished processing the file and will return the JSON report. Otherwise, | |
it will simply return a task identifier that can be used to query the status of the analysis. | |
:param path: The path to the PE to analyze. | |
:param blocking: Whether the function should wait until processing is finished or return immediately. | |
:return: | |
""" | |
f = {'file': open(path, "rb")} | |
r = requests.post(f"{ENDPOINT}/upload", files=f) | |
if r.status_code != 200: | |
return None | |
if not blocking: # Non blocking submission. Return the task ID. | |
return json.loads(r.text)["data"]["task_id"] | |
# Blocking submission: poll the API until results are available. | |
api_result = json.loads(r.text) | |
task_id = api_result["data"]["task_id"] | |
while api_result["status"] != "finished": | |
time.sleep(1) # Sleep 1 second between each query | |
api_result = get_task_status(task_id) | |
if api_result is None: | |
return None | |
if api_result["status"] == "failed": | |
print(f"Error: {api_result['data']['error_message']}") | |
return None | |
# The endpoint has finished processing the sample. | |
if api_result["data"]["task_result"]["manalyze_status"] == "failed": | |
print(api_result["data"]["task_result"]["error_message"]) | |
return None | |
else: # Get the results if there was no error | |
return get_report(task_id) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment