Skip to content

Instantly share code, notes, and snippets.

@jcran
Forked from tetrillard/rss_hackerone_hacktivity.py
Last active November 13, 2024 03:33
Show Gist options
  • Save jcran/197a1703c9f59ca4cf43ad652cc0c600 to your computer and use it in GitHub Desktop.
Save jcran/197a1703c9f59ca4cf43ad652cc0c600 to your computer and use it in GitHub Desktop.
HackerOne Hacktivity - recent url monitor
import logging
import re
import json
import asyncio
import requests
#
# including these methods as wrappers around requests method
#
async def fetch_http_content(url: str, headers: dict = None, max_retries: int = 3):
retry_count = 0
if headers is None:
headers = {}
# Add a random user agent if not provided
if "User-Agent" not in headers:
headers["User-Agent"] = get_random_user_agent()
while retry_count < max_retries:
try:
response = requests.get(url, headers=headers)
except requests.exceptions.SSLError as e:
logging.warning(
f"An SSL error occurred while requesting {url}: {e}. Sleeping 3 min."
)
time.sleep(180) # sleep for a full minute before retrying
retry_count += 1
continue
except requests.exceptions.ConnectionError as e:
logging.warning(
f"A connection error occurred while requesting {url}: {e}. "
"Remote end closed connection without response. Sleeping 2 min."
)
await asyncio.sleep(120)
retry_count += 1
continue
if response.status_code == 200:
return {
"url": url,
"code": response.status_code,
"content": response.text,
"headers": response.headers,
}
retry_count += 1
await asyncio.sleep(random.uniform(1, 5)) # backoff
raise Exception(
f"Failed to fetch data from the URL: {url} after {max_retries} retries"
)
async def post_http_content(url: str, json_data: dict = None, headers: dict = None, max_retries: int = 3):
retry_count = 0
if headers is None:
headers = {}
# Add a random user agent if not provided
if "User-Agent" not in headers:
headers["User-Agent"] = get_random_user_agent()
while retry_count < max_retries:
try:
response = requests.post(url, json=json_data, headers=headers)
except requests.exceptions.SSLError as e:
logging.warning(
f"An SSL error occurred while requesting {url}: {e}. Sleeping 1 min."
)
time.sleep(60) # sleep for a full minute before retrying
retry_count += 1
continue
except requests.exceptions.ConnectionError as e:
logging.warning(
f"A connection error occurred while requesting {url}: {e}. "
"Remote end closed connection without response. Sleeping 2 min."
)
await asyncio.sleep(120)
retry_count += 1
continue
if response.status_code == 200:
return {
"url": url,
"code": response.status_code,
"content": response.text,
"headers": response.headers,
}
retry_count += 1
await asyncio.sleep(random.uniform(1, 5)) # backoff
raise Exception(
f"Failed to fetch data from the URL: {url} after {max_retries} retries"
)
class HackeroneHacktivityMonitor:
async def fetch_entries(self) -> list:
"""Fetch latest HackerOne hacktivity entries."""
logging.info("Fetching HackerOne hacktivity entries")
url = "https://hackerone.com/graphql"
url_hacktivity = "https://hackerone.com/hacktivity"
# if this doesnt work, check requests tab in Chrome, query may have changed. Working as of 2024-11-13
json_data = {
'operationName': 'HacktivitySearchQuery',
'variables': {
'queryString': 'disclosed:true',
'size': 25,
'from': 0,
'sort': {
'field': 'votes',
'direction': 'DESC'
},
'product_area': 'hacktivity',
'product_feature': 'overview'
},
'query': '''query HacktivitySearchQuery($queryString: String!, $from: Int, $size: Int, $sort: SortInput!) {\n me {\n id\n __typename\n }\n search(\n index: CompleteHacktivityReportIndex\n query_string: $queryString\n from: $from\n size: $size\n sort: $sort\n ) {\n __typename\n total_count\n nodes {\n __typename\n ... on HacktivityDocument {\n id\n _id\n reporter {\n id\n username\n name\n __typename\n }\n cve_ids\n cwe\n severity_rating\n upvoted: upvoted_by_current_user\n public\n report {\n id\n databaseId: _id\n title\n substate\n url\n disclosed_at\n report_generated_content {\n id\n hacktivity_summary\n __typename\n }\n __typename\n }\n votes\n team {\n id\n handle\n name\n medium_profile_picture: profile_picture(size: medium)\n url\n currency\n __typename\n }\n total_awarded_amount\n latest_disclosable_action\n latest_disclosable_activity_at\n submitted_at\n disclosed\n has_collaboration\n __typename\n }\n }\n }\n}\n''',
}
# Get CSRF token from hacktivity page
resp = await fetch_http_content(url_hacktivity)
token = re.findall(r'<meta name="csrf-token" content="([^"]*)" />', resp.get("content"))[0]
# Make GraphQL request with token
resp = await post_http_content(url, json_data, headers={'x-csrf-token': token})
j = json.loads(resp.get("content"))
# Print each report URL
url_entries = []
for report in j['data']['search']['nodes']:
# example response 2024-11-13
#{
# "__typename": "HacktivityDocument",
# "id": "Z2lkOi8vaGFja2Vyb25lL0NvbXBsZXRlSGFja3Rpdml0eVJlcG9ydEluZGV4Lzg2NzUxMw==",
# "_id": "867513",
# "reporter": {
# "id": "Z2lkOi8vaGFja2Vyb25lL1VzZXIvOTg3NDQx",
# "username": "imgnotfound",
# "name": "",
# "__typename": "User"
# },
# "cve_ids": [],
# "cwe": null,
# "severity_rating": "Critical",
# "upvoted": null,
# "public": true,
# "report": {
# "id": "Z2lkOi8vaGFja2Vyb25lL1JlcG9ydC84Njc1MTM=",
# "databaseId": "867513",
# "title": "Takeover an account that doesn't have a Shopify ID and more",
# "substate": "resolved",
# "url": "https://hackerone.com/reports/867513",
# "disclosed_at": "2020-09-02T14:47:24.963Z",
# "report_generated_content": null,
# "__typename": "Report"
# },
# "votes": 2918,
# "team": {
# "id": "Z2lkOi8vaGFja2Vyb25lL0VuZ2FnZW1lbnRzOjpCdWdCb3VudHlQcm9ncmFtLzEzODI=",
# "handle": "shopify",
# "name": "Shopify",
# "medium_profile_picture": "https://profile-photos.hackerone-user-content.com/variants/fjjiC5585s8WoDGHv2M5okbJ/a11a5f547ea25bb14fce3951b07a50f6288859555c2028feb0c3aad3e1aea36d",
# "url": "https://hackerone.com/shopify",
# "currency": "usd",
# "__typename": "Team"
# },
# "total_awarded_amount": null,
# "latest_disclosable_action": "Activities::BountyAwarded",
# "latest_disclosable_activity_at": "2021-02-11T18:08:43.343Z",
# "submitted_at": "2020-05-07T00:51:25.479Z",
# "disclosed": true,
# "has_collaboration": false
# },
# append json to get the full report in JSON format
report_url = report.get("report").get("url")
url_entries.append(report_url)
print(f"Found HackerOne report URL: {report_url}")
return url_entries
if __name__ == "__main__":
monitor = HackeroneHacktivityMonitor()
asyncio.run(monitor.fetch_entries())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment