Created
January 27, 2021 03:08
-
-
Save maropu/281bda7e8964158e9c6c131f95262d91 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from collections import Counter | |
from pyspark.accumulators import AccumulatorParam | |
from pyspark.sql.functions import col, pandas_udf, PandasUDFType | |
class UdfMetricAccumulatorParam(AccumulatorParam): | |
def zero(self, value): | |
init_value = {} | |
return init_value.update(value) | |
def _mergeDict(self, d1, d2): | |
return dict(Counter(d1) + Counter(d2)) | |
def addInPlace(self, d1, d2): | |
if d2 is None: | |
return d1 | |
elif d1 is None: | |
return d2 | |
else: | |
return self._mergeDict(d1, d2) | |
metrics = sc.accumulator({}, UdfMetricAccumulatorParam()) | |
def elapsed_time(f): | |
def wrapper(*args, **kwargs): | |
global metrics | |
start_time = time.time() | |
ret = f(*args, **kwargs) | |
metrics += {f.__name__: time.time() - start_time} | |
return ret | |
return wrapper | |
@elapsed_time | |
def sleep1_func(): | |
time.sleep(1) | |
@elapsed_time | |
def sleep2_func(): | |
time.sleep(2) | |
@elapsed_time | |
def sleep3_func(): | |
time.sleep(3) | |
@pandas_udf('long', PandasUDFType.SCALAR) | |
def f(v): | |
sleep1_func() | |
sleep2_func() | |
sleep3_func() | |
return v | |
spark.range(128, numPartitions=32).select(f(col("id"))).show() | |
# In v3.0.1, tracking accumulators in the webUI is not supported in Python: | |
# - https://spark.apache.org/docs/3.0.1/rdd-programming-guide.html#accumulators | |
metrics.value | |
# {'sleep1_func': 6.006200551986694, 'sleep2_func': 12.006130933761597, 'sleep3_func': 18.0038845539093} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment