Skip to content

Instantly share code, notes, and snippets.

@s3rius
Last active December 26, 2024 10:34
Show Gist options
  • Save s3rius/91c39494fe1b96ad467cee671dfdf5ec to your computer and use it in GitHub Desktop.
Save s3rius/91c39494fe1b96ad467cee671dfdf5ec to your computer and use it in GitHub Desktop.
Taskiq performance benchmark

Here's a small comparison between taskiq, celery and dramatiq.

This benchmark is a modified version of an original dramatiq benchmark.

To run the benchmark locally:

docker compose up -d --wait      # To startup services used for testing.
pip install -r requirements.txt  # To install python dependencies
python runner.py --retries 10 -t 10 -t 100 -t 1_000 -t 10_000
docker compose down              # To remove all services used for benching

Note

Hardware used for benchmarks:

CPU: AMD Ryzen 5 7530U with Radeon Graphics (12) @ 4.546GHz

RAM: 16Gib

OS: Arch Linux x86_64

Kernel: 6.12.4-arch1-1

Python: 3.12.7

Results for redis (in seconds) (full script output is below):

number of tasks Taskiq Dramatiq (sys threads) Celery (sys threads) Dramatiq (green threads) Celery (green threads)
10 0.4562 0.5058 1.7826 0.6072 1.6305
100 0.5064 0.5562 1.8801 0.7590 1.8820
1000 0.6587 0.7112 2.6901 1.0627 4.4507
10000 1.9126 2.2488 11.5926 2.4156 30.8411
Unformatted output
python runner.py -r 20 -t 10 -t 100 -t 1_000 -t 10_000 --bench fib -o results.txt
Taskiq:
      N=10:
              Min   : 0.4547
              Max   : 0.5601
              Mean  : 0.4836
              Median: 0.4562
              p(90) : 0.5520
      N=100:
              Min   : 0.5054
              Max   : 0.5583
              Mean  : 0.5142
              Median: 0.5064
              p(90) : 0.5577
      N=1000:
              Min   : 0.6068
              Max   : 0.7131
              Mean  : 0.6490
              Median: 0.6587
              p(90) : 0.7037
      N=10000:
              Min   : 1.7787
              Max   : 2.0475
              Mean  : 1.8994
              Median: 1.9126
              p(90) : 1.9992
Dramatiq (sys threads):
      N=10:
              Min   : 0.4547
              Max   : 0.5080
              Mean  : 0.5034
              Median: 0.5058
              p(90) : 0.5076
      N=100:
              Min   : 0.5053
              Max   : 0.5720
              Mean  : 0.5400
              Median: 0.5562
              p(90) : 0.5619
      N=1000:
              Min   : 0.7087
              Max   : 0.8155
              Mean  : 0.7364
              Median: 0.7112
              p(90) : 0.8105
      N=10000:
              Min   : 2.0921
              Max   : 2.3000
              Mean  : 2.2176
              Median: 2.2488
              p(90) : 2.2643
Dramatiq (green threads):
      N=10:
              Min   : 0.5559
              Max   : 0.7157
              Mean  : 0.6161
              Median: 0.6072
              p(90) : 0.7032
      N=100:
              Min   : 0.7577
              Max   : 0.8656
              Mean  : 0.7795
              Median: 0.7590
              p(90) : 0.8642
      N=1000:
              Min   : 1.0608
              Max   : 1.1691
              Mean  : 1.0891
              Median: 1.0627
              p(90) : 1.1635
      N=10000:
              Min   : 2.2583
              Max   : 2.8735
              Mean  : 2.5140
              Median: 2.4156
              p(90) : 2.8707
Celery (sys threads):
      N=10:
              Min   : 1.7338
              Max   : 1.8338
              Mean  : 1.7866
              Median: 1.7826
              p(90) : 1.8333
      N=100:
              Min   : 1.8266
              Max   : 1.9353
              Mean  : 1.8699
              Median: 1.8801
              p(90) : 1.9296
      N=1000:
              Min   : 2.6389
              Max   : 2.7921
              Mean  : 2.7100
              Median: 2.6901
              p(90) : 2.7861
      N=10000:
              Min   : 11.4401
              Max   : 11.7421
              Mean  : 11.5875
              Median: 11.5926
              p(90) : 11.6921
Celery (green threads):
      N=10:
              Min   : 1.6250
              Max   : 1.6336
              Mean  : 1.6296
              Median: 1.6305
              p(90) : 1.6329
      N=100:
              Min   : 1.8778
              Max   : 1.9344
              Mean  : 1.8841
              Median: 1.8820
              p(90) : 1.8852
      N=1000:
              Min   : 4.4404
              Max   : 4.5507
              Mean  : 4.4700
              Median: 4.4507
              p(90) : 4.5016
      N=10000:
              Min   : 30.5624
              Max   : 31.5718
              Mean  : 30.8576
              Median: 30.8411
              p(90) : 31.1695

Results for RabbitMQ (in seconds) (full script output is below):

number of tasks Taskiq Dramatiq (sys threads) Celery (sys threads) Dramatiq (green threads) Celery (green threads)
10 0.5057 0.5312 1.8818 0.6325 1.6296
100 0.5059 0.5566 1.8873 0.7589 1.8305
1000 0.6584 0.7616 2.1360 0.9610 2.3346
10000 2.1565 2.3421 4.7314 2.3405 23.5759
Unformatted output
python runner.py -r 10 -t 10 -t 100 -t 1_000 -t 10_000 --bench fib --rmq -o results.txt
Taskiq:
      N=10:
              Min   : 0.4551
              Max   : 0.5084
              Mean  : 0.4959
              Median: 0.5057
              p(90) : 0.5082
      N=100:
              Min   : 0.5051
              Max   : 0.5557
              Mean  : 0.5111
              Median: 0.5059
              p(90) : 0.5511
      N=1000:
              Min   : 0.6084
              Max   : 0.7140
              Mean  : 0.6546
              Median: 0.6584
              p(90) : 0.7135
      N=10000:
              Min   : 2.1088
              Max   : 2.2137
              Mean  : 2.1539
              Median: 2.1565
              p(90) : 2.2091
Dramatiq (sys threads):
      N=10:
              Min   : 0.5055
              Max   : 0.6117
              Mean  : 0.5420
              Median: 0.5312
              p(90) : 0.6114
      N=100:
              Min   : 0.5561
              Max   : 0.6603
              Mean  : 0.5671
              Median: 0.5566
              p(90) : 0.6501
      N=1000:
              Min   : 0.7594
              Max   : 0.8159
              Mean  : 0.7721
              Median: 0.7616
              p(90) : 0.8158
      N=10000:
              Min   : 2.3389
              Max   : 2.4993
              Mean  : 2.3906
              Median: 2.3421
              p(90) : 2.4950
Dramatiq (green threads):
      N=10:
              Min   : 0.6070
              Max   : 0.7134
              Mean  : 0.6384
              Median: 0.6325
              p(90) : 0.7081
      N=100:
              Min   : 0.7080
              Max   : 0.8609
              Mean  : 0.7645
              Median: 0.7589
              p(90) : 0.8561
      N=1000:
              Min   : 0.9099
              Max   : 1.0665
              Mean  : 0.9668
              Median: 0.9610
              p(90) : 1.0612
      N=10000:
              Min   : 2.3099
              Max   : 2.5041
              Mean  : 2.3649
              Median: 2.3405
              p(90) : 2.4960
Celery (sys threads):
      N=10:
              Min   : 1.8330
              Max   : 1.8865
              Mean  : 1.8772
              Median: 1.8818
              p(90) : 1.8864
      N=100:
              Min   : 1.8783
              Max   : 1.9369
              Mean  : 1.9047
              Median: 1.8873
              p(90) : 1.9367
      N=1000:
              Min   : 2.1326
              Max   : 2.1863
              Mean  : 2.1501
              Median: 2.1360
              p(90) : 2.1862
      N=10000:
              Min   : 4.6534
              Max   : 4.8575
              Mean  : 4.7364
              Median: 4.7314
              p(90) : 4.8477
Celery (green threads):
      N=10:
              Min   : 1.6244
              Max   : 1.6328
              Mean  : 1.6282
              Median: 1.6296
              p(90) : 1.6326
      N=100:
              Min   : 1.7803
              Max   : 1.8846
              Mean  : 1.8361
              Median: 1.8305
              p(90) : 1.8844
      N=1000:
              Min   : 2.2786
              Max   : 2.3866
              Mean  : 2.3346
              Median: 2.3346
              p(90) : 2.3865
      N=10000:
              Min   : 23.4233
              Max   : 23.8804
              Mean  : 23.5748
              Median: 23.5759
              p(90) : 23.8553
import argparse
from pathlib import Path
import asyncio
import logging
import os
import random
import subprocess
import sys
import time
import celery
from redis import ConnectionPool as SyncConnectionPool, Redis as SyncRedis
from redis.asyncio import (
BlockingConnectionPool as AsyncConnectionPool,
Redis as AsyncRedis,
)
import dramatiq
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from taskiq_redis import ListQueueBroker
from taskiq_aio_pika import AioPikaBroker
from dramatiq.brokers.redis import RedisBroker
logger = logging.getLogger("example")
counter_key = "latench-bench-counter"
redis_pool = SyncConnectionPool.from_url(url="redis://localhost")
redis_async_pool = AsyncConnectionPool.from_url(url="redis://localhost")
random.seed(1337)
if os.getenv("REDIS") == "1":
dramatiq_broker = RedisBroker()
dramatiq.set_broker(dramatiq_broker)
taskiq_broker = ListQueueBroker("redis://localhost")
celery_app = celery.Celery(broker="redis:///")
else:
dramatiq_broker = RabbitmqBroker(host="127.0.0.1")
dramatiq.set_broker(dramatiq_broker)
taskiq_broker = AioPikaBroker("amqp://guest:guest@localhost:5672/")
celery_app = celery.Celery(broker="amqp://guest:guest@localhost:5672/")
def fib_bench(n):
p, q = 0, 1
while n > 0:
p, q = q, p + q
n -= 1
with SyncRedis(connection_pool=redis_pool) as redis:
redis.incr(counter_key)
return p
async def fib_bench_async(n):
p, q = 0, 1
while n > 0:
p, q = q, p + q
n -= 1
async with AsyncRedis(connection_pool=redis_async_pool) as redis:
await redis.incr(counter_key)
return p
def latency_bench():
p = random.randint(1, 100)
if p == 1:
duration = 10
elif p <= 30:
duration = 5
elif p <= 50:
duration = 3
else:
duration = 1
time.sleep(duration)
with SyncRedis(connection_pool=redis_pool) as redis:
redis.incr(counter_key)
async def latency_bench_async():
p = random.randint(1, 100)
if p == 1:
duration = 10
elif p <= 30:
duration = 5
elif p <= 50:
duration = 3
else:
duration = 1
await asyncio.sleep(duration)
async with AsyncRedis(connection_pool=redis_async_pool) as redis:
await redis.incr(counter_key)
dramatiq_fib_bench = dramatiq.actor(fib_bench)
dramatiq_latency_bench = dramatiq.actor(latency_bench)
celery_fib_bench = celery_app.task(name="fib-bench", acks_late=True)(fib_bench)
celery_latency_bench = celery_app.task(name="latency-bench", acks_late=True)(
latency_bench
)
taskiq_fib_bench = taskiq_broker.task()(fib_bench_async)
taskiq_latency_bench = taskiq_broker.task()(latency_bench_async)
def benchmark_arg(value):
benchmarks = ("fib", "latency")
if value not in benchmarks:
raise argparse.ArgumentTypeError(f"benchmark must be one of {benchmarks!r}")
return value
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--benchmark",
help="the benchmark to run",
type=benchmark_arg,
default="latency",
)
parser.add_argument(
"--count",
help="the number of messages to benchmark with",
type=int,
default=10000,
)
parser.add_argument(
"--use-green-threads",
help="run workers with green threads rather than system threads",
action="store_true",
default=False,
)
parser.add_argument(
"--use-dramatiq",
help="run the benchmark under Dramatiq",
action="store_true",
default=False,
)
parser.add_argument(
"--use-celery",
help="run the benchmark under Celery",
action="store_true",
default=False,
)
parser.add_argument(
"--use-taskiq",
help="run the benchmark under TaskIQ",
action="store_true",
default=False,
)
parser.add_argument(
"--output",
"-o",
help="output file for benchmark results",
type=Path,
default=None,
)
return parser.parse_args()
async def main(args):
args = parse_args()
await taskiq_broker.startup()
if args.use_celery:
print("Purging Celery...")
purge_proc = subprocess.Popen(["celery", "-A", "bench", "purge", "-f"])
purge_proc.wait()
for _ in range(args.count):
if args.use_celery:
if args.benchmark == "latency":
celery_latency_bench.delay()
elif args.benchmark == "fib":
celery_fib_bench.delay(random.randint(1, 200))
elif args.use_dramatiq:
if args.benchmark == "latency":
dramatiq_latency_bench.send()
elif args.benchmark == "fib":
dramatiq_fib_bench.send(random.randint(1, 200))
else:
if args.benchmark == "latency":
await taskiq_latency_bench.kiq()
elif args.benchmark == "fib":
await taskiq_fib_bench.kiq(random.randint(1, 200))
print("Done enqueing messages. Booting workers...")
with SyncRedis() as client:
client.set(counter_key, 0)
start_time = time.time()
if args.use_celery:
subprocess_args = ["celery", "-A", "bench", "worker"]
if args.use_green_threads:
subprocess_args.extend(["-P", "eventlet", "-c", "2000"])
else:
subprocess_args.extend(["-c", "8"])
elif args.use_dramatiq:
if args.use_green_threads:
subprocess_args = ["dramatiq-gevent", "bench", "-p", "8", "-t", "250"]
else:
subprocess_args = ["dramatiq", "bench", "-p", "8"]
else:
subprocess_args = [
"taskiq",
"worker",
"bench:taskiq_broker",
"-w",
"8",
"--max-async-tasks",
"2000",
]
proc = subprocess.Popen(subprocess_args)
processed = 0
while processed < args.count:
processed = int(client.get(counter_key))
print(f"{processed}/{args.count} messages processed\r", end="")
time.sleep(0.05)
duration = time.time() - start_time
if args.output:
with args.output.open("w") as f:
f.write(f"{duration}\n")
proc.terminate()
proc.wait()
print(f"Took {duration} seconds to process {args.count} messages.")
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main(sys.argv)))
services:
redis:
image: redis:7-bookworm
ports:
- 6379:6379
healthcheck:
test: [CMD, redis-cli, ping]
interval: 1s
timeout: 3s
retries: 50
rmq:
image: rabbitmq:3.13-management
ports:
- 15672:15672
- 5672:5672
healthcheck:
test: [CMD, rabbitmq-diagnostics, check_running, -q]
interval: 3s
timeout: 3s
retries: 50
aio-pika==9.5.4 ; python_version >= "3.12" and python_version < "4.0"
aiormq==6.8.1 ; python_version >= "3.12" and python_version < "4.0"
amqp==5.3.1 ; python_version >= "3.12" and python_version < "4.0"
annotated-types==0.7.0 ; python_version >= "3.12" and python_version < "4.0"
anyio==4.7.0 ; python_version >= "3.12" and python_version < "4.0"
billiard==4.2.1 ; python_version >= "3.12" and python_version < "4.0"
celery==5.4.0 ; python_version >= "3.12" and python_version < "4.0"
cffi==1.17.1 ; platform_python_implementation == "CPython" and sys_platform == "win32" and python_version >= "3.12" and python_version < "4.0"
click-didyoumean==0.3.1 ; python_version >= "3.12" and python_version < "4.0"
click-plugins==1.1.1 ; python_version >= "3.12" and python_version < "4.0"
click-repl==0.3.0 ; python_version >= "3.12" and python_version < "4.0"
click==8.1.8 ; python_version >= "3.12" and python_version < "4.0"
colorama==0.4.6 ; python_version >= "3.12" and python_version < "4.0" and platform_system == "Windows"
dnspython==2.7.0 ; python_version >= "3.12" and python_version < "4.0"
dramatiq==1.17.1 ; python_version >= "3.12" and python_version < "4.0"
eventlet==0.38.2 ; python_version >= "3.12" and python_version < "4.0"
exceptiongroup==1.2.2 ; python_version >= "3.12" and python_version < "4.0"
gevent==24.11.1 ; python_version >= "3.12" and python_version < "4.0"
greenlet==3.1.1 ; python_version >= "3.12" and python_version < "4.0"
idna==3.10 ; python_version >= "3.12" and python_version < "4.0"
importlib-metadata==8.5.0 ; python_version >= "3.12" and python_version < "4.0"
kombu==5.4.2 ; python_version >= "3.12" and python_version < "4.0"
multidict==6.1.0 ; python_version >= "3.12" and python_version < "4.0"
packaging==24.2 ; python_version >= "3.12" and python_version < "4.0"
pamqp==3.3.0 ; python_version >= "3.12" and python_version < "4.0"
pika==1.3.2 ; python_version >= "3.12" and python_version < "4.0"
prometheus-client==0.21.1 ; python_version >= "3.12" and python_version < "4.0"
prompt-toolkit==3.0.48 ; python_version >= "3.12" and python_version < "4.0"
propcache==0.2.1 ; python_version >= "3.12" and python_version < "4.0"
pycparser==2.22 ; platform_python_implementation == "CPython" and sys_platform == "win32" and python_version >= "3.12" and python_version < "4.0"
pycron==3.0.0 ; python_version >= "3.12" and python_version < "4.0"
pydantic-core==2.27.2 ; python_version >= "3.12" and python_version < "4.0"
pydantic==2.10.4 ; python_version >= "3.12" and python_version < "4.0"
python-dateutil==2.9.0.post0 ; python_version >= "3.12" and python_version < "4.0"
pytz==2024.2 ; python_version >= "3.12" and python_version < "4.0"
redis==5.2.1 ; python_version >= "3.12" and python_version < "4.0"
setuptools==75.6.0 ; python_version >= "3.12" and python_version < "4.0"
six==1.17.0 ; python_version >= "3.12" and python_version < "4.0"
sniffio==1.3.1 ; python_version >= "3.12" and python_version < "4.0"
taskiq-aio-pika==0.4.1 ; python_version >= "3.12" and python_version < "4.0"
taskiq-dependencies==1.5.6 ; python_version >= "3.12" and python_version < "4.0"
taskiq-redis==1.0.2 ; python_version >= "3.12" and python_version < "4.0"
taskiq==0.11.10 ; python_version >= "3.12" and python_version < "4.0"
typing-extensions==4.12.2 ; python_version >= "3.12" and python_version < "4.0"
tzdata==2024.2 ; python_version >= "3.12" and python_version < "4.0"
vine==5.1.0 ; python_version >= "3.12" and python_version < "4.0"
wcwidth==0.2.13 ; python_version >= "3.12" and python_version < "4.0"
yarl==1.18.3 ; python_version >= "3.12" and python_version < "4.0"
zipp==3.21.0 ; python_version >= "3.12" and python_version < "4.0"
zope-event==5.0 ; python_version >= "3.12" and python_version < "4.0"
zope-interface==7.2 ; python_version >= "3.12" and python_version < "4.0"
import subprocess
import sys
import pathlib
import statistics
import os
import tempfile
import argparse
from typing import TextIO
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--rmq",
help="Wether to use RabbitMQ for this benchmark",
action="store_true",
default=False,
)
parser.add_argument(
"-r",
"--retries",
help="Number of retries",
type=int,
default=10,
)
parser.add_argument(
"--bench",
help="The benchmark to run",
choices=["fib", "latency"],
default="fib",
)
parser.add_argument("--output", "-o", type=pathlib.Path, default=None)
parser.add_argument("--tasks", "-t", action="append", type=int, default=[])
return parser.parse_args()
def run_bench(args: list[str], retries: int, redis: bool = False) -> list[float]:
results = []
env = {}
if redis:
env["REDIS"] = "1"
for i in range(retries):
print(f"Running bench {i}")
with tempfile.NamedTemporaryFile("r") as tmp:
bench_proc = subprocess.Popen(
[*args, "-o", tmp.name],
env={**os.environ, **env},
)
bench_proc.wait()
if bench_proc.returncode != 0:
raise Exception("Bench failed")
res = float(tmp.read())
results.append(res)
return results
def print_bench_res(io: TextIO, bench_res: dict[int, list[float]]):
for num, samples in bench_res.items():
io.writelines(
[
f"\tN={num}:\n",
f"\t\tMin : {min(samples):.4f}\n",
f"\t\tMax : {max(samples):.4f}\n",
f"\t\tMean : {statistics.mean(samples):.4f}\n",
f"\t\tMedian: {statistics.median(samples):.4f}\n",
f"\t\tp(90) : {statistics.quantiles(samples, n=10)[8]:.4f}\n",
]
)
def write_res(io: TextIO, results: dict[str, dict[int, list[float]]]):
for name, res in results.items():
io.write(f"{name}:\n")
io.flush()
print_bench_res(io, res)
io.flush()
def main():
args = parse_args()
base_args = ["python", "bench.py", "--benchmark", args.bench, "--count"]
taskiq_res = {}
for i in args.tasks:
taskiq_res[i] = run_bench(
[*base_args, str(i), "--use-taskiq"],
redis=not args.rmq,
retries=args.retries,
)
celery_sys_threads_res = {}
for i in args.tasks:
celery_sys_threads_res[i] = run_bench(
[*base_args, str(i), "--use-celery"],
redis=not args.rmq,
retries=args.retries,
)
celery_green_threads_res = {}
for i in args.tasks:
celery_green_threads_res[i] = run_bench(
[*base_args, str(i), "--use-celery", "--use-green-threads"],
redis=not args.rmq,
retries=args.retries,
)
dramatiq_sys_threads_res = {}
for i in args.tasks:
dramatiq_sys_threads_res[i] = run_bench(
[*base_args, str(i), "--use-dramatiq"],
redis=not args.rmq,
retries=args.retries,
)
dramatiq_green_threads_res = {}
for i in args.tasks:
dramatiq_green_threads_res[i] = run_bench(
[*base_args, str(i), "--use-dramatiq", "--use-green-threads"],
redis=not args.rmq,
retries=args.retries,
)
print(" results ".center(100, "="))
all_results = {
"Taskiq": taskiq_res,
"Dramatiq (sys threads)": dramatiq_sys_threads_res,
"Dramatiq (green threads)": dramatiq_green_threads_res,
"Celery (sys threads)": celery_sys_threads_res,
"Celery (green threads)": celery_green_threads_res,
}
write_res(sys.stdout, all_results)
if args.output is not None:
print(f"Writing results to {args.output}")
with args.output.open("w") as w:
write_res(w, all_results)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment