Take for example this simplified pseudo code for hackernews ingest.
import requests
def get_post(id):
return requests.get(f"https://hacker-news.firebaseio.com/v0/item/{id}.json?print=pretty").json()
def send_to_trieve(json_batch):
# ...
requests.post("https://api.trieve.ai/api/chunk", {
# ...
})
pass
last_visited = 0
while True:
maxitem = int(requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").text)
batch = []
# part 1
for id in range(last_visited, maxitem):
# part 2
post_json = get_post(id)
batch.append(post_json)
if len(batch) == 120:
# part 3
send_to_trieve(batch)
last_visited = maxitem
This code has 2 major sections of io:
- Sending chunks to trieve
- Getting data from firebase
And two long standing variables:
last_visited
batch
The fastest way to parrallize this chunk of code is to split up the list to have multiple processes handle it. The solution of making multiple threads and sharing code across locks may be a good solution for a small scale, but typically adds a lot more overhead to the code itself. This also introduces a limit of the hardware you are running it on. Now you need a machine that has enough cores and power to manage a very large number of threads.
A better way to do this, with minimal code overhead is to use redis
as the systems memory
# set_ids.py
import redis
import requests
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
last_visited = 0
while True:
maxitem = int(requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").text)
batch = []
# part 1
for id in range(last_visited, maxitem):
redis_client.lpush("ids_to_visit", id)
redis_client.set("last_visited", id)
# fetch_and_send_chunks.py
import redis
import requests
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
batch = []
while True:
id = int(redis_client.brpop("ids_to_visit"))
# part 2
post_json = get_post(id)
batch.append(post_json)
if len(batch) == 120:
# part 3
send_to_trieve(batch)
We now have 2 seperate processes. One that is not able to be parrallized and one that is able to be parrallized.
However, we are still facing one issue. The calls to firebase are a lot slower than the calls to trieve. Partially
due to latency partially due to trieve accepting requests in a batch rather than just single item at a time.
fetch_and_send_chunks.py
is still going to be a pretty nasty bottleneck in our system. We can follow a similar
pattern we did before to seperate this out into 2 programs.
# fetch_chunks.py
import redis
import requests
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
batch = []
while True:
id = int(redis_client.brpop("ids_to_visit"))
# part 2
post_json = get_post(id)
redis_client.rpush("to_trieve", post_json)
# send_chunks_to_trieve.py
import redis
import requests
def deserialize_to_dict(item):
item = item.decode("utf-8")
json_item = ast.literal_eval(item)
return json_item
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
batch_size = 120
while True:
redis_resp = redis_client.lpop("to_trieve", batch_size)
if redis_resp is None:
continue
batch = list(map(deserialize_to_dict, redis_resp))
send_to_trieve(batch)
Splitting up this last step now means that most I/O latency bottlenecks can now be mostly horizontally scaled away until networking bandwidth limitations arrive.
Theres a few basic ways to run it
- tmux (for the insane and the brave)
- docker (with docker-compose)
- kubernetes
Now I have found that in some cases, tmux works a little better than multiple files, but it is a pain to setup and scale out. I have had many of cramps with tmux. and scaling out with this, but it does work. The biggest issue is that if you hit system limitations then the whole tmux session may die.
Well the first step to docker-compsoe is docker. For this example they all can fit witihin a similar Dockerfile template. Just change the name of the python file to the one you need.
FROM python:3.12.4-slim-bookworm
RUN apt-get update -y && apt-get -y install pkg-config libssl-dev libpq-dev g++ curl
WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install -r requirements.txt
COPY send_chunks_to_trieve.py /app/send_chunks_to_trieve.py
CMD ["python", "/app/send_chunks_to_trieve.py"]
and build/push with docker build -t trieve/send_chunks_to_trieve . && docker push trieve/send_to_trieve
Load it into a docker-compsoe.yml file as such
# docker-compsoe.yml
version: "3"
services:
set_ids:
image: trieve/set_ids
environment:
REDIS_URL: <redis-uri>
fetch_chunks:
image: trieve/fetch_chunks
environment:
REDIS_URL: <redis-uri>
send_chunks_to_trieve:
image: trieve/send_chunks_to_trieve
environment:
REDIS_URL: <redis-uri>
Then you can scale the services with docker-compose up -d --scale fetch_chunks=20 --scale send_chunks_to_trieve=2 --scale set_ids=1
.
This solution is far easier to manage than tmux, but still sufferes from the same issue of single system resource limits and some random crashing. If the pipeline needs more than 2 boxes the management of multiple different pods and differnt boxes becomes an issue.
Shockingly its not much harder to put the pipeline into kubernetes. I will skip the steps to create a kubernetes cluster, show how the deployment file should look like
# kube-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: set-ids
labels:
app.kubernetes.io/name: set-ids
spec:
replicas: 1 # Set to 1 by default
selector:
matchLabels:
app.kubernetes.io/name: set-ids
template:
metadata:
labels:
app.kubernetes.io/name: set-ids
spec:
containers:
- name: hn-set-ids
image: trieve/set-ids
imagePullPolicy: Always
env:
- name: REDIS_URL
value: <redis-uri>
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: fetch_chunks
labels:
app.kubernetes.io/name: fetch_chunks
spec:
replicas: 20 # Set to 20 by default
selector:
matchLabels:
app.kubernetes.io/name: fetch_chunks
template:
metadata:
labels:
app.kubernetes.io/name: fetch_chunks
spec:
containers:
- name: hn-fetch_chunks
image: trieve/fetch_chunks
imagePullPolicy: Always
env:
- name: REDIS_URL
value: <redis-uri>
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: send_chunks_to_trieve
labels:
app.kubernetes.io/name: send_chunks_to_trieve
spec:
replicas: 2 # Set to 0 by default
selector:
matchLabels:
app.kubernetes.io/name: send_chunks_to_trieve
template:
metadata:
labels:
app.kubernetes.io/name: send_chunks_to_trieve
spec:
containers:
- name: hn-send_chunks_to_trieve
image: trieve/send_chunks_to_trieve
imagePullPolicy: Always
env:
- name: REDIS_URL
value: <redis-uri>
To run this deployment run kubectl apply -f kube-deployment.yaml
. If you want
to modify any parameters of the deployment. Modify the file then rerun the
apply
command, it will do a diff check and only update the services that
changed.
To tear down the services run kubectl delete -f kube-deployment.yaml
.