Created
June 22, 2024 13:16
-
-
Save costa/d3844d1758214b65daa16f43980b31b2 to your computer and use it in GitHub Desktop.
Shim component for NATS Streaming (XpoLog input)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NOTE this is self-server comp conf; NATS is (core) "reality" comp's service | |
version: '2.1' | |
services: | |
stan-xpolog: | |
build: . | |
networks: | |
- default | |
- reality_default | |
- xpolog_default | |
environment: | |
- PORT=80 | |
- NATS_URI=nats://nats:4222 | |
- STAN_CLUSTER=reality | |
- STAN_CHAN=this | |
- XPOLOG_URI=http://xpolog:30303/logeye/api/logger.jsp | |
expose: | |
- 80 | |
restart: on-failure | |
networks: | |
reality_default: | |
external: true | |
xpolog_default: | |
external: true | |
# NOTE 'stan-xpolog' (allocated by self server) volume is basically wasted |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM python:3.9 | |
WORKDIR /stan-xpolog | |
ADD requirements.txt . | |
RUN pip install -r requirements.txt | |
ADD . . | |
ENV PORT=80 | |
# NOTE works with a single process only (because of global var)... good enough! | |
CMD fastapi run server.py --port $PORT --workers 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import asyncio | |
import requests | |
from typing import Annotated | |
from fastapi import Body, FastAPI, status, responses | |
import nats.aio.client | |
import stan.aio.client | |
COMP_NAME = 'stan-xpolog' | |
NATS_URI = os.environ['NATS_URI'] | |
STAN_CLUSTER = os.environ['STAN_CLUSTER'] | |
STAN_CHAN = os.environ['STAN_CHAN'] | |
XPOLOG_URI = os.environ['XPOLOG_URI'] | |
JSON_HEADERS = {'Content-Type': 'application/json'} | |
app = FastAPI() | |
app.xpolog_http_url = None | |
@app.post("/") | |
@app.put("/") | |
@app.patch("/") | |
async def set_token(token: Annotated[str, Body(embed=True)]): | |
app.xpolog_http_url = XPOLOG_URI + '?token=' + token | |
return responses.Response(status_code=status.HTTP_204_NO_CONTENT) | |
async def mq_reactor(loop): | |
diz_nats = nats.aio.client.Client() | |
await diz_nats.connect(NATS_URI, max_reconnect_attempts=-1, io_loop=loop) | |
diz_stan = stan.aio.client.Client() | |
await diz_stan.connect(STAN_CLUSTER, COMP_NAME, nats=diz_nats) | |
async def cb(msg): | |
try: | |
if app.xpolog_http_url: | |
response = requests.post(app.xpolog_http_url, headers=JSON_HEADERS, data=msg.data) | |
if response.status_code != 200: | |
print("ERROR", response.json(), "HTTPing", msg.data, "to", app.xpolog_http_url) | |
except Exception as e: | |
print("ERROR", e) # NOTE and soldier on... | |
await diz_stan.subscribe(STAN_CHAN, durable_name=COMP_NAME, cb=cb) | |
@app.on_event('startup') | |
async def startup(): | |
loop = asyncio.get_running_loop() | |
await loop.create_task(mq_reactor(loop)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment