Last active
June 18, 2023 21:32
-
-
Save chrismay/57c1e166ae6831bfdb38a634ba4e7fa1 to your computer and use it in GitHub Desktop.
pyspark parallel http reqs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
import time | |
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import col, collect_list,udf | |
spark = SparkSession.builder.getOrCreate() | |
data = range(1, 10000) | |
df = spark.createDataFrame([(i,) for i in data], ["value"]) | |
grouped_df = ( | |
df.withColumn("mod", col("value") % 500) | |
.groupBy("mod") | |
.agg(collect_list("value").alias("values")) | |
) | |
async def fetch(url, session): | |
try: | |
async with session.get(url) as response: | |
content = await response.read() | |
return (url, "OK", content) | |
except Exception as e: | |
print(e) | |
return (url, "ERROR", str(e)) | |
async def run(url_list): | |
tasks = [] | |
async with aiohttp.ClientSession() as session: | |
for url in url_list: | |
task = asyncio.ensure_future(fetch(url, session)) | |
tasks.append(task) | |
responses = asyncio.gather(*tasks) | |
await responses | |
return responses | |
def request_multi(counter_list): | |
url_list = [f"http://localhost:8080/?req={i}" for i in counter_list] | |
loop = asyncio.get_event_loop() | |
asyncio.set_event_loop(loop) | |
task = asyncio.ensure_future(run(url_list)) | |
loop.run_until_complete(task) | |
result = task.result().result() | |
return result | |
multi_req_udf = udf(request_multi) | |
def noop(*args): | |
pass | |
start = time.perf_counter() | |
results = grouped_df.withColumn("result", multi_req_udf("values")) | |
# this seems to be enough to force execution of all rows. | |
f = results.foreach(noop) | |
duration = time.perf_counter() - start | |
print("10000 reqs took ", duration, "s") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment