Skip to content

Instantly share code, notes, and snippets.

@DFilyushin
Last active January 29, 2025 05:03
Show Gist options
  • Save DFilyushin/31469b964d5227f1bc7327a7fba32e85 to your computer and use it in GitHub Desktop.
Save DFilyushin/31469b964d5227f1bc7327a7fba32e85 to your computer and use it in GitHub Desktop.
clickhouse_load_test
import clickhouse_connect
import asyncio
import multiprocessing
import time
import signal
import sys
from datetime import datetime, timedelta
from random import randint
# Конфигурация подключения к ClickHouse
CLICKHOUSE_HOST = 'localhost'
CLICKHOUSE_PORT = 8123
CLICKHOUSE_USER = 'default'
CLICKHOUSE_PASSWORD = 'password'
# Количество клиентов (процессов)
NUM_CLIENTS = 10
# Флаг для остановки клиентов
stop_flag = multiprocessing.Event()
def get_random_query():
random_date = datetime.isoformat(datetime.today() - timedelta(days=randint(1, 365)))[:10]
return f"""
SELECT map(base_fiat, toDecimal64(price_history,10)) AS rate_map
FROM
default.fiat_history
WHERE
fiat_code = 'RUB'
AND abs(dateDiff('second', date_history, parseDateTimeBestEffort('{random_date}'))) = (
SELECT min(abs(dateDiff('second', date_history, parseDateTimeBestEffort('{random_date}'))))
FROM default.fiat_history WHERE fiat_code = 'USD')
"""
async def execute_query(client, query):
start_time = time.time()
result = client.query(query)
print(result.result_rows)
end_time = time.time()
return end_time - start_time
async def client_task(client_id):
client = clickhouse_connect.get_client(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
username=CLICKHOUSE_USER,
password=CLICKHOUSE_PASSWORD
)
query_count = 0
while not stop_flag.is_set():
query = get_random_query()
duration = await execute_query(client, query)
query_count += 1
print(f"Client {client_id}, Query {query_count}: {duration:.2f} seconds")
print(f"Client {client_id} finished. Total queries: {query_count}")
def run_client(client_id):
asyncio.run(client_task(client_id))
def signal_handler(sig, frame):
print("Stopping by Ctrl+C...")
stop_flag.set()
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
# Создаем и запускаем процессы для каждого клиента
processes = []
for i in range(NUM_CLIENTS):
p = multiprocessing.Process(target=run_client, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment