Created
February 15, 2022 08:07
-
-
Save csiebler/553e0cd89dce38ae88d9304bf1fa33c0 to your computer and use it in GitHub Desktop.
Test Read API performance using different methologies
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import io | |
import logging | |
import threading | |
import time | |
import concurrent.futures | |
import Levenshtein as lev | |
from datetime import datetime | |
date_time = datetime.now() | |
out = open(date_time.strftime("%Y-%m-%d-%H-%M-%S") + '.csv', "a") | |
out.write(f"method, date, image size, processing duration\n") | |
out_summary = open(date_time.strftime("%Y-%m-%d-%H-%M-%S") + '_summary.csv', "a") | |
out_summary.write(f"method, date, image size, total processing duration\n") | |
n = 10 # Number of documents per thread | |
threads = 10 # Number of parallel threads | |
# If you want to test different document resolutions, put them in here, use SAS URLs or setup Managed Identity | |
urls = { | |
'500': 'https://csperftesting.blob.core.windows.net/images/500.jpg.....', | |
'1000': 'https://csperftesting.blob.core.windows.net/images/1000.jpg.....', | |
'2000': 'https://csperftesting.blob.core.windows.net/images/2000.jpg....', | |
'3000': 'https://csperftesting.blob.core.windows.net/images/3000.jpg.....', | |
} | |
# Enter your resource details here, give one key per thread | |
endpoint_url = "https://westeurope.api.cognitive.microsoft.com/vision/v3.2/read/analyze?language=en&pages=1&readingOrder=natural" | |
keys = ["key1", | |
"key2", | |
"..", | |
"...", | |
"...", | |
"..", | |
"..", | |
"....", | |
"...", | |
"...."] | |
assert(len(keys) == threads), "Number of Read API keys must be equal to number of threads" | |
def stay_below_10_tps(start_time, end_time): | |
duration = end_time - start_time | |
if (duration < 0.1): | |
time.sleep(0.1-duration) # Ensure we stay under 10 TPS | |
def start_ocr(key, size): | |
headers = { | |
"Content-Type": "application/json", | |
"Ocp-Apim-Subscription-Key": key | |
} | |
json = { | |
"url": urls[size] | |
} | |
start_time = time.time() | |
result = requests.post(endpoint_url, json=json, headers=headers) | |
assert result.status_code == 202, result.text | |
stay_below_10_tps(start_time, time.time()) | |
return result.headers['Operation-Location'] | |
def wait_for_results(key, location_url): | |
headers = { | |
"Content-Type": "application/json", | |
"Ocp-Apim-Subscription-Key": key | |
} | |
status = "running" | |
while(status != "succeeded"): | |
start_time = time.time() | |
response = requests.get(location_url, headers=headers).json() | |
status = response['status'] | |
stay_below_10_tps(start_time, time.time()) | |
assert status == "succeeded", response | |
return response | |
def run_sequential_test(size): | |
key = keys[0] | |
start_time = time.time() | |
for i in range(n): | |
doc_start = time.time() | |
location_url = start_ocr(key, size) | |
response = wait_for_results(key, location_url) | |
out.write(f"Option 1 - Sequential, {datetime.now()}, {size}, {time.time() - doc_start}\n") | |
duration = time.time() - start_time | |
print(f"Sequential batch of {n} took {duration} seconds") | |
out_summary.write(f"Option 1 - Sequential, {datetime.now()}, {size}, {duration}\n") | |
def run_sequential_optimized_test(size): | |
key = keys[0] | |
start_time = time.time() | |
locations = [] | |
for i in range(n): | |
doc_start = time.time() | |
location_url = start_ocr(key, size) | |
locations.append({'location_url': location_url, | |
'start_time': doc_start}) | |
for l in locations: | |
response = wait_for_results(key, l['location_url']) | |
out.write(f"Option 2 - Optimized, {datetime.now()}, {size}, {time.time() - l['start_time']}\n") | |
duration = time.time() - start_time | |
print(f"Sequential optimized batch of {n} took {duration} seconds") | |
out_summary.write(f"Option 2 - Optimized, {datetime.now()}, {size}, {duration}\n") | |
def test_parallel(i): | |
key = keys[i%threads] | |
doc_start = time.time() | |
r_url = start_ocr(key, size) | |
response = wait_for_results(key, r_url) | |
out.write(f"Option 3 - Multithreading, {datetime.now()}, {size}, {time.time() - doc_start}\n") | |
def run_parallel_test(size): | |
start_time = time.time() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: | |
results = executor.map(test_parallel, range(n)) | |
executor.shutdown(wait=True) | |
duration = time.time() - start_time | |
print(f"Parallel batch of {n} took {duration} seconds") | |
out_summary.write(f"Option 3 - Multithreading, {datetime.now()}, {size}, {duration}\n") | |
for size in urls.keys(): | |
run_sequential_test(size) | |
run_sequential_optimized_test(size) | |
run_parallel_test(size) | |
out.close() | |
out_summary.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment