Skip to content

Instantly share code, notes, and snippets.

@EKami
Last active March 11, 2025 10:03
Show Gist options
  • Save EKami/136a013c729baa231ef892bfdd40fe5e to your computer and use it in GitHub Desktop.
Save EKami/136a013c729baa231ef892bfdd40fe5e to your computer and use it in GitHub Desktop.
Py -> Go communication

The goal here is to be able to run Python code from Go while keeping everything as simple as possible by not having to spawn external services such as Valkey/Memcache for inter-process communication. We want to be able to ship a single Go binary (the Python code can be embedded in the Go binary with Go embed).

We use several pieces of technologies to achieve this goal:

  1. Pixi allows us to create a self-contained Conda env. Instead of having to rely on venv activate before running our code, we can simply pixi run and the env in the .pixi folder will automatically be used. Pixi is also a single binary file that doesn't require system-wide installation.
  2. NATS allows us to start a server that listen to the response from our Python script directly from our Go code (without requiring a separate service)

Simply put, the GO helper code looks like this:

// Put all this code in `app/internal/python/python.go` including the python scripts (in subfolders)
package python

import (
	"app/internal/util"
	"errors"
	"github.com/google/uuid"
	"github.com/nats-io/nats-server/v2/server"
	"github.com/nats-io/nats.go"
	"log"
	"os"
	"os/exec"
	"path"
	"time"
)

var nc *nats.Conn

// initNatsServer Could be moved to a simple init() but it's often a bad idea to put anything in init()
func initNatsServer() {
	opts := &server.Options{
		Host:     "localhost",
		Port:     4222,
		Username: "Baguette",
		Password: "PainAuChocolatNotChocolatine",
		// Disable logging for cleaner output
		NoLog:  true,
		NoSigs: true,
	}
	ns, err := server.NewServer(opts)
	if err != nil {
		log.Fatal(err)
	}
	go ns.Start()
	if !ns.ReadyForConnections(5 * time.Second) {
		log.Fatal("NATS server failed to start")
	}
	nc, err = nats.Connect(
		ns.ClientURL(),
		nats.UserInfo(opts.Username, opts.Password),
	)
	if err != nil {
		log.Fatal("Client connection failed:", err)
	}
}

func RunDarwinRegisterAnnotation(darwinApiKey string, params map[string]string) (<-chan string, error) {
	mergedParams := map[string]string{
		"darwin_api_key": darwinApiKey,
	}
	for k, v := range params {
		mergedParams[k] = v
	}
	pyRoot := path.Join(util.GetProjectRoot(), "internal", "python")
	return RunPythonScript(path.Join(pyRoot, "darwin_register_annotation", "main.py"), mergedParams)
}

func RunPythonScript(scriptName string, params map[string]string) (<-chan string, error) {
	return PyVenvExec("python "+scriptName, params)
}

func PyVenvExec(scriptName string, params map[string]string) (<-chan string, error) {
	if nc == nil {
		initNatsServer()
	}

	args := make([]string, 0, len(params)*2)
	for k, v := range params {
		args = append(args, "--"+k, v)
	}

	cmd := exec.Command(
		"pixi",
		append([]string{"run", scriptName}, args...)...,
	)
	venvPath := path.Join(util.GetProjectRoot(), ".pixi", "envs", "default")
	newPath := "PATH=" + path.Join(venvPath, "bin") + ":" + os.Getenv("PATH")
	natsChUUID := uuid.New().String()
	cmd.Env = append(os.Environ(), newPath, "NATS_CHANNEL="+natsChUUID)

	ch := make(chan string)
	go waitForPyResponse(natsChUUID, ch)

	cmd.Stdout = os.Stdout
	cmd.Stderr = os.Stderr
	err := cmd.Run()
	return ch, err
}

func waitForPyResponse(natsChanID string, ch chan string) {
	sub, err := nc.SubscribeSync(natsChanID)
	if err != nil {
		log.Fatal("Subscription failed:", err)
	}
	defer func(sub *nats.Subscription) {
		err := sub.Drain()
		if err != nil {
			log.Fatal("Drain failed:", err)
		}
	}(sub)

	// Block with timeout (large one in case of long-running tasks)
	msg, err := sub.NextMsg(time.Minute * 30)
	if err != nil {
		if errors.Is(err, nats.ErrTimeout) {
			log.Fatal("Timeout while waiting for response from the Python program")
		}
		log.Fatal("Message error:", err)
	}

	ch <- string(msg.Data)
}

Now to call this code externally we simply do:

package main

import "app/internal/python"

func main() {
  // Those are inputs to the python script
	params := map[string]string{
		"slide_id":         "something",
		"annotations_json": `{"key": "val"}`,
	}
	response, err := python.RunDarwinRegisterAnnotation(apiKey, params)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(<-response)
}

And the darwin_register_annotation/main.py looks like this:

import json
import click
import os
import asyncio
from datetime import datetime
from pathlib import Path

from nats.aio.client import Client as NATS


def register_annotations_in_darwin(api_key, team_slug, dataset_slug, slide_id, annotations_json):
    # Register stuff here then return the response
    return json.dumps({"test": "response data"})


async def send_message(channel_id: str, message: str):
    nc = NATS()
    await nc.connect(
        servers=["nats://Baguette:PainAuChocolatNotChocolatine@localhost:4222"]
    )
    await nc.publish(channel_id, message.encode())
    await nc.drain()


@click.command(help='Register AI annotations in V7')
@click.option(
    '--darwin_api_key', type=str, help='The API key to use to authenticate to Darwin V7'
)
@click.option(
    '--slide_id', type=str, required=True, help='The slide ID to register the annotation in'
)
@click.option(
    '--team_slug', type=str, default="pathologywatch"
)
@click.option(
    '--dataset_slug', default="auto-annotation-test", type=str
)
@click.option(
    '--annotations_json', type=str, required=True, help='A string containing the annotations in the Darwin format'
)
def main(darwin_api_key, slide_id, team_slug, dataset_slug, annotations_json):
    start_time = datetime.now()
    output = register_annotations_in_darwin(darwin_api_key, team_slug, dataset_slug, slide_id, annotations_json)

    # Only publish the output if run from a Go process, otherwise this part is ignored which allows us to also run our Python script as a regular script without NATS
    nats_chan = os.getenv("NATS_CHANNEL")
    if nats_chan is not None:
        print(f"Publishing on {nats_chan}")
        asyncio.run(send_message(nats_chan, output))
    print(f"Total time spend: (hh:mm:ss.ms) {datetime.now() - start_time}\n")


if __name__ == "__main__":
    main()

The pixi.toml containing the dependencies looks like this:

[project]
authors = ["Tuatini GODARD <[email protected]>"]
channels = ["conda-forge"]
description = "Builder for the Darwin integration"
name = "the-tool"
platforms = ["osx-arm64", "linux-64"]
version = "0.1.0"

[dependencies]
python = "3.12.*"
boto3 = "*"
click = "*"
tqdm = "*"
requests = "*"
py-opencv = ">=4.11.0,<5"
numpy = ">=2.2.3,<3"
nats-py = ">=2.9.0,<3"

[pypi-dependencies]
darwin-py = { version = "*", extras = ["ocv"] }

The whole thing runs with go run ., with the only external binary required is pixi (and its env files) which can too, be embed in the Go binary (with Go embed) or simply added to the Dockerfile docker image.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment