The goal here is to be able to run Python code from Go while keeping everything as simple as possible by not having to spawn external services such as Valkey/Memcache for inter-process communication. We want to be able to ship a single Go binary (the Python code can be embedded in the Go binary with Go embed).
We use several pieces of technologies to achieve this goal:
- Pixi allows us to create a self-contained Conda env. Instead of having to rely on
venv activate
before running our code, we can simplypixi run
and the env in the.pixi
folder will automatically be used. Pixi is also a single binary file that doesn't require system-wide installation. - NATS allows us to start a server that listen to the response from our Python script directly from our Go code (without requiring a separate service)
Simply put, the GO helper code looks like this:
// Put all this code in `app/internal/python/python.go` including the python scripts (in subfolders)
package python
import (
"app/internal/util"
"errors"
"github.com/google/uuid"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"log"
"os"
"os/exec"
"path"
"time"
)
var nc *nats.Conn
// initNatsServer Could be moved to a simple init() but it's often a bad idea to put anything in init()
func initNatsServer() {
opts := &server.Options{
Host: "localhost",
Port: 4222,
Username: "Baguette",
Password: "PainAuChocolatNotChocolatine",
// Disable logging for cleaner output
NoLog: true,
NoSigs: true,
}
ns, err := server.NewServer(opts)
if err != nil {
log.Fatal(err)
}
go ns.Start()
if !ns.ReadyForConnections(5 * time.Second) {
log.Fatal("NATS server failed to start")
}
nc, err = nats.Connect(
ns.ClientURL(),
nats.UserInfo(opts.Username, opts.Password),
)
if err != nil {
log.Fatal("Client connection failed:", err)
}
}
func RunDarwinRegisterAnnotation(darwinApiKey string, params map[string]string) (<-chan string, error) {
mergedParams := map[string]string{
"darwin_api_key": darwinApiKey,
}
for k, v := range params {
mergedParams[k] = v
}
pyRoot := path.Join(util.GetProjectRoot(), "internal", "python")
return RunPythonScript(path.Join(pyRoot, "darwin_register_annotation", "main.py"), mergedParams)
}
func RunPythonScript(scriptName string, params map[string]string) (<-chan string, error) {
return PyVenvExec("python "+scriptName, params)
}
func PyVenvExec(scriptName string, params map[string]string) (<-chan string, error) {
if nc == nil {
initNatsServer()
}
args := make([]string, 0, len(params)*2)
for k, v := range params {
args = append(args, "--"+k, v)
}
cmd := exec.Command(
"pixi",
append([]string{"run", scriptName}, args...)...,
)
venvPath := path.Join(util.GetProjectRoot(), ".pixi", "envs", "default")
newPath := "PATH=" + path.Join(venvPath, "bin") + ":" + os.Getenv("PATH")
natsChUUID := uuid.New().String()
cmd.Env = append(os.Environ(), newPath, "NATS_CHANNEL="+natsChUUID)
ch := make(chan string)
go waitForPyResponse(natsChUUID, ch)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
return ch, err
}
func waitForPyResponse(natsChanID string, ch chan string) {
sub, err := nc.SubscribeSync(natsChanID)
if err != nil {
log.Fatal("Subscription failed:", err)
}
defer func(sub *nats.Subscription) {
err := sub.Drain()
if err != nil {
log.Fatal("Drain failed:", err)
}
}(sub)
// Block with timeout (large one in case of long-running tasks)
msg, err := sub.NextMsg(time.Minute * 30)
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
log.Fatal("Timeout while waiting for response from the Python program")
}
log.Fatal("Message error:", err)
}
ch <- string(msg.Data)
}
Now to call this code externally we simply do:
package main
import "app/internal/python"
func main() {
// Those are inputs to the python script
params := map[string]string{
"slide_id": "something",
"annotations_json": `{"key": "val"}`,
}
response, err := python.RunDarwinRegisterAnnotation(apiKey, params)
if err != nil {
log.Fatal(err)
}
fmt.Println(<-response)
}
And the darwin_register_annotation/main.py
looks like this:
import json
import click
import os
import asyncio
from datetime import datetime
from pathlib import Path
from nats.aio.client import Client as NATS
def register_annotations_in_darwin(api_key, team_slug, dataset_slug, slide_id, annotations_json):
# Register stuff here then return the response
return json.dumps({"test": "response data"})
async def send_message(channel_id: str, message: str):
nc = NATS()
await nc.connect(
servers=["nats://Baguette:PainAuChocolatNotChocolatine@localhost:4222"]
)
await nc.publish(channel_id, message.encode())
await nc.drain()
@click.command(help='Register AI annotations in V7')
@click.option(
'--darwin_api_key', type=str, help='The API key to use to authenticate to Darwin V7'
)
@click.option(
'--slide_id', type=str, required=True, help='The slide ID to register the annotation in'
)
@click.option(
'--team_slug', type=str, default="pathologywatch"
)
@click.option(
'--dataset_slug', default="auto-annotation-test", type=str
)
@click.option(
'--annotations_json', type=str, required=True, help='A string containing the annotations in the Darwin format'
)
def main(darwin_api_key, slide_id, team_slug, dataset_slug, annotations_json):
start_time = datetime.now()
output = register_annotations_in_darwin(darwin_api_key, team_slug, dataset_slug, slide_id, annotations_json)
# Only publish the output if run from a Go process, otherwise this part is ignored which allows us to also run our Python script as a regular script without NATS
nats_chan = os.getenv("NATS_CHANNEL")
if nats_chan is not None:
print(f"Publishing on {nats_chan}")
asyncio.run(send_message(nats_chan, output))
print(f"Total time spend: (hh:mm:ss.ms) {datetime.now() - start_time}\n")
if __name__ == "__main__":
main()
The pixi.toml
containing the dependencies looks like this:
[project]
authors = ["Tuatini GODARD <[email protected]>"]
channels = ["conda-forge"]
description = "Builder for the Darwin integration"
name = "the-tool"
platforms = ["osx-arm64", "linux-64"]
version = "0.1.0"
[dependencies]
python = "3.12.*"
boto3 = "*"
click = "*"
tqdm = "*"
requests = "*"
py-opencv = ">=4.11.0,<5"
numpy = ">=2.2.3,<3"
nats-py = ">=2.9.0,<3"
[pypi-dependencies]
darwin-py = { version = "*", extras = ["ocv"] }
The whole thing runs with go run .
, with the only external binary required is pixi
(and its env files) which can too, be embed in the Go binary (with Go embed) or simply added to the Dockerfile
docker image.