Skip to content

Instantly share code, notes, and snippets.

@cdxker
Last active July 25, 2024 21:02
Show Gist options
  • Save cdxker/7cfa24a6957975a0ce9c65bb10541f47 to your computer and use it in GitHub Desktop.
Save cdxker/7cfa24a6957975a0ce9c65bb10541f47 to your computer and use it in GitHub Desktop.
How to make any ingestion pipeline scale fast and how to run the pipline.

Take for example this simplified pseudo code for hackernews ingest.

import requests

def get_post(id):
  
  return requests.get(f"https://hacker-news.firebaseio.com/v0/item/{id}.json?print=pretty").json()

def send_to_trieve(json_batch):
  # ...
  requests.post("https://api.trieve.ai/api/chunk", {
    # ...
  })
  pass


last_visited = 0
while True:
  maxitem = int(requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").text)

  batch = []
  # part 1
  for id in range(last_visited, maxitem):

	# part 2
    post_json = get_post(id)
    batch.append(post_json)

    if len(batch) == 120:
	  # part 3
      send_to_trieve(batch)

  last_visited = maxitem

This code has 2 major sections of io:

  • Sending chunks to trieve
  • Getting data from firebase

And two long standing variables:

  • last_visited
  • batch

The fastest way to parrallize this chunk of code is to split up the list to have multiple processes handle it. The solution of making multiple threads and sharing code across locks may be a good solution for a small scale, but typically adds a lot more overhead to the code itself. This also introduces a limit of the hardware you are running it on. Now you need a machine that has enough cores and power to manage a very large number of threads.

A better way to do this, with minimal code overhead is to use redis as the systems memory

# set_ids.py
import redis
import requests

redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

last_visited = 0
while True:
  maxitem = int(requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").text)

  batch = []
  # part 1
  for id in range(last_visited, maxitem):
	redis_client.lpush("ids_to_visit", id)
	redis_client.set("last_visited", id)
# fetch_and_send_chunks.py
import redis
import requests

redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

batch = []
while True:

  id = int(redis_client.brpop("ids_to_visit"))
  # part 2
  post_json = get_post(id)
  batch.append(post_json)

  if len(batch) == 120:
	# part 3
	send_to_trieve(batch)

We now have 2 seperate processes. One that is not able to be parrallized and one that is able to be parrallized. However, we are still facing one issue. The calls to firebase are a lot slower than the calls to trieve. Partially due to latency partially due to trieve accepting requests in a batch rather than just single item at a time. fetch_and_send_chunks.py is still going to be a pretty nasty bottleneck in our system. We can follow a similar pattern we did before to seperate this out into 2 programs.

# fetch_chunks.py
import redis
import requests

redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

batch = []
while True:

  id = int(redis_client.brpop("ids_to_visit"))
  # part 2
  post_json = get_post(id)
  redis_client.rpush("to_trieve", post_json)
# send_chunks_to_trieve.py
import redis
import requests

def deserialize_to_dict(item):
    item = item.decode("utf-8")
    json_item = ast.literal_eval(item)

	return json_item

redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

batch_size = 120

while True:
  redis_resp = redis_client.lpop("to_trieve", batch_size)

  if redis_resp is None:
	  continue

  batch = list(map(deserialize_to_dict, redis_resp))
  send_to_trieve(batch)

Splitting up this last step now means that most I/O latency bottlenecks can now be mostly horizontally scaled away until networking bandwidth limitations arrive.

Great, now we made a very good system... How do we run it?

Theres a few basic ways to run it

  • tmux (for the insane and the brave)
  • docker (with docker-compose)
  • kubernetes

TMUX

Now I have found that in some cases, tmux works a little better than multiple files, but it is a pain to setup and scale out. I have had many of cramps with tmux. and scaling out with this, but it does work. The biggest issue is that if you hit system limitations then the whole tmux session may die.

Docker-compose

Well the first step to docker-compsoe is docker. For this example they all can fit witihin a similar Dockerfile template. Just change the name of the python file to the one you need.

FROM python:3.12.4-slim-bookworm

RUN apt-get update -y && apt-get -y install pkg-config libssl-dev libpq-dev g++ curl

WORKDIR /app
COPY requirements.txt /app/requirements.txt

RUN pip install -r requirements.txt
COPY send_chunks_to_trieve.py /app/send_chunks_to_trieve.py

CMD ["python", "/app/send_chunks_to_trieve.py"]

and build/push with docker build -t trieve/send_chunks_to_trieve . && docker push trieve/send_to_trieve

Load it into a docker-compsoe.yml file as such

# docker-compsoe.yml
version: "3"
services:
  set_ids:
    image: trieve/set_ids
    environment:
      REDIS_URL: <redis-uri>
  fetch_chunks:
    image: trieve/fetch_chunks
    environment:
      REDIS_URL: <redis-uri>
  send_chunks_to_trieve:
    image: trieve/send_chunks_to_trieve
    environment:
      REDIS_URL: <redis-uri>

Then you can scale the services with docker-compose up -d --scale fetch_chunks=20 --scale send_chunks_to_trieve=2 --scale set_ids=1.

This solution is far easier to manage than tmux, but still sufferes from the same issue of single system resource limits and some random crashing. If the pipeline needs more than 2 boxes the management of multiple different pods and differnt boxes becomes an issue.

Kubernetes

Shockingly its not much harder to put the pipeline into kubernetes. I will skip the steps to create a kubernetes cluster, show how the deployment file should look like

# kube-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: set-ids
  labels:
    app.kubernetes.io/name: set-ids
spec:
  replicas: 1 # Set to 1 by default
  selector:
    matchLabels:
      app.kubernetes.io/name: set-ids
  template:
    metadata:
      labels:
        app.kubernetes.io/name: set-ids
    spec:
      containers:
      - name: hn-set-ids
        image: trieve/set-ids
        imagePullPolicy: Always
        env:
          - name: REDIS_URL
            value: <redis-uri>
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fetch_chunks
  labels:
    app.kubernetes.io/name: fetch_chunks
spec:
  replicas: 20 # Set to 20 by default
  selector:
    matchLabels:
      app.kubernetes.io/name: fetch_chunks
  template:
    metadata:
      labels:
        app.kubernetes.io/name: fetch_chunks
    spec:
      containers:
      - name: hn-fetch_chunks
        image: trieve/fetch_chunks
        imagePullPolicy: Always
        env:
          - name: REDIS_URL
            value: <redis-uri>
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: send_chunks_to_trieve
  labels:
    app.kubernetes.io/name: send_chunks_to_trieve
spec:
  replicas: 2 # Set to 0 by default
  selector:
    matchLabels:
      app.kubernetes.io/name: send_chunks_to_trieve
  template:
    metadata:
      labels:
        app.kubernetes.io/name: send_chunks_to_trieve
    spec:
      containers:
      - name: hn-send_chunks_to_trieve
        image: trieve/send_chunks_to_trieve
        imagePullPolicy: Always
        env:
          - name: REDIS_URL
            value: <redis-uri>

To run this deployment run kubectl apply -f kube-deployment.yaml. If you want to modify any parameters of the deployment. Modify the file then rerun the apply command, it will do a diff check and only update the services that changed.

To tear down the services run kubectl delete -f kube-deployment.yaml.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment