Skip to content

Instantly share code, notes, and snippets.

@perryism
Last active November 17, 2023 23:44
Show Gist options
  • Save perryism/7a44d95f224d77a3de2b417427867670 to your computer and use it in GitHub Desktop.
Save perryism/7a44d95f224d77a3de2b417427867670 to your computer and use it in GitHub Desktop.
simple api gateway
def handle(body):
body["foo"] = "bar"
return body
package main
import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"strings"
"time"
)
var config = make(map[string]Endpoint)
// TODO: make this configurable
func setUp() {
config["/local"] = Endpoint{Url: "http://localhost:8081/predict",
Subpaths: map[string]SubPath{
"/logistic_regression/v1": {
PreProcessor: func(newreq *http.Header) {
newreq.Add("FUNCTION-NAME", "logisticregressionpredictor")
},
PostProcessor: func(body string) string {
return body
},
},
"/linear_regression/v1": {
PreProcessor: func(newreq *http.Header) {
newreq.Add("FUNCTION-NAME", "linregpredictor")
},
PostProcessor: pythonScript("handler.py"),
},
"/linear_regression/v2": {
PreProcessor: func(newreq *http.Header) {
newreq.Add("FUNCTION-NAME", "linregpredictor")
},
PostProcessor: pythonRun(`
def handle(body):
body["foo"] = "test"
return body
`),
},
},
}
config["/vertexai"] = Endpoint{
Url: "https://vertexai.com/:predict",
Subpaths: map[string]SubPath{
"/logistic_regression/v1": {
PreProcessor: func(newreq *http.Header) {
newreq.Add("FUNCTION-NAME", "logisticregressionpredictor")
},
PostProcessor: NullPostProcessor,
},
"/linear_regression/v1": {
PreProcessor: func(newreq *http.Header) {
// TODO: use decorate pattern
// newreq.Add("Content-Type", "application/json")
// newreq.Add("Accept", "application/json")
newreq.Add("FUNCTION-NAME", "linregpredictor")
},
PostProcessor: pythonScript("handler.py"),
},
},
}
}
func getEndpoint(path string, config map[string]Endpoint) (Endpoint, SubPath, error) {
for k := range config {
if strings.HasPrefix(path, k) {
endpoint := config[k]
subpath := path[strings.Index(path, k)+len(k):]
fmt.Println("subpath", subpath)
foundSubpath := endpoint.Subpaths[subpath]
return endpoint, foundSubpath, nil
}
}
return Endpoint{}, SubPath{}, fmt.Errorf("No endpoints matched from %s", path)
}
func route(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
endpoint, subPath, err := getEndpoint(path, config)
//FIXME
subPath.Endpoint = endpoint
if err != nil {
fmt.Fprint(w, err)
}
subPath.handle(w, req)
}
type PreProcessor func(newreq *http.Header)
type PostProcessor func(body string) string
func NullPostProcessor(body string) string {
return body
}
// Use decorated object
type DecoratedRequest struct {
Method string
Url string
Body io.Reader
}
func pythonRun(script string) PostProcessor {
tmpFile := "tmp.py"
// defer os.Remove(tmpFile)
//FIXME
f, _ := os.Create(tmpFile)
f.Write([]byte(script))
f.Close()
return pythonScript(tmpFile)
}
func pythonScript(scriptPath string) PostProcessor {
return func(script string) string {
fmt.Printf("Running %s\n", scriptPath)
tmpFile, err := ioutil.TempFile(os.TempDir(), "prefix-")
if err != nil {
log.Fatal("Cannot create temporary file", err)
}
// Remember to clean up the file afterwards
// defer os.Remove(tmpFile.Name())
fmt.Println("Created File: " + tmpFile.Name())
if _, err = tmpFile.Write([]byte(script)); err != nil {
log.Fatal("Failed to write to temporary file", err)
}
tmpFile.Close()
fmt.Printf("python -c \"import runner; runner.run('%s', '%s')\"", tmpFile.Name(), scriptPath)
// time.Sleep(10 * time.Second)
//FIXME: this should be run in a docker to avoid destructive behavior
cmd := exec.Command("python", "-c", fmt.Sprintf("import runner; runner.run('%s', '%s')", tmpFile.Name(), scriptPath))
out, err := cmd.Output()
if err != nil {
fmt.Println("err", err)
}
fmt.Println("out", string(out))
return string(out)
}
}
type SubPath struct {
PreProcessor PreProcessor
PostProcessor PostProcessor
Subpaths map[string]SubPath
Endpoint Endpoint
}
type Endpoint struct {
Url string
PreProcessor PreProcessor
Subpaths map[string]SubPath
}
func (subPath *SubPath) handle(w http.ResponseWriter, req *http.Request) {
fmt.Println(subPath.Endpoint.Url)
buf := new(strings.Builder)
_, err := io.Copy(buf, req.Body)
// check errors
// someString := `{"instances": [[1.9, 3.1]], "parameters": {"coefficients": [-0.1068913, 0.41444855, -0.2486209]}}`
myReader := strings.NewReader(buf.String())
newreq, err := http.NewRequest(req.Method, subPath.Endpoint.Url, myReader)
fmt.Println("**********")
fmt.Println(buf.String())
fmt.Println("**********")
if err != nil {
fmt.Printf("client: could not create request: %s\n", err)
os.Exit(1)
}
newreq.Header = req.Header
subPath.PreProcessor(&newreq.Header)
client := http.Client{
Timeout: 30 * time.Second,
}
res, err := client.Do(newreq)
if err != nil {
fmt.Printf("client: error making http request: %s\n", err)
os.Exit(1)
}
defer res.Body.Close()
reqBody, err := ioutil.ReadAll(res.Body)
if err != nil {
fmt.Printf("server: could not read request body: %s\n", err)
}
strBody := subPath.PostProcessor(string(reqBody))
fmt.Fprint(w, strBody)
}
func main() {
setUp()
http.HandleFunc("/", route)
http.ListenAndServe(":8090", nil)
}
import json
import importlib
def run(tmp_file, script):
with open(tmp_file, "r") as f:
body = json.loads(f.read())
module = importlib.import_module(script.split(".")[0])
preprocessed_body = module.handle(body)
# just for debug purpose
with open("/tmp/perry.txt", "w") as f:
f.write(json.dumps(preprocessed_body))
print(preprocessed_body)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment