Skip to content

Instantly share code, notes, and snippets.

@pgmrDohan
Last active September 9, 2025 05:53
Show Gist options
  • Select an option

  • Save pgmrDohan/b0a50006f32c7135a4c7f9d6a34fe3d7 to your computer and use it in GitHub Desktop.

Select an option

Save pgmrDohan/b0a50006f32c7135a4c7f9d6a34fe3d7 to your computer and use it in GitHub Desktop.
TRNG API Using Arduino Uno & FastAPI on Raspberry Pi
import asyncio
import base64
import logging
import os
import signal
import threading
import time
from typing import AsyncGenerator, Optional
import serial
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.responses import StreamingResponse, PlainTextResponse
# ---------- Config ----------
SERIAL_PORT = os.environ.get("TRNG_SERIAL_PORT", "/dev/serial0")
SERIAL_BAUD = int(os.environ.get("TRNG_SERIAL_BAUD", "9600"))
QUEUE_MAX = int(os.environ.get("TRNG_QUEUE_MAX", "65536"))
READ_CHUNK = 32
DEFAULT_TIMEOUT = 5.0
# ---------- Logging ----------
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("trng-api")
# ---------- App ----------
app = FastAPI(title="TRNG API", version="0.1")
# ---------- Global variables ----------
bytes_queue: Optional[asyncio.Queue] = None
remaining_bytes = bytearray()
remaining_lock: Optional[asyncio.Lock] = None
serial_reader: Optional['SerialReader'] = None
# ---------- Serial reader thread ----------
class SerialReader(threading.Thread):
def __init__(self, port: str, baud: int, chunk: int = 32):
super().__init__(daemon=True)
self.port = port
self.baud = baud
self.chunk = chunk
self._stop = threading.Event()
self.ser: Optional[serial.Serial] = None
self._loop = None
def set_loop(self, loop):
self._loop = loop
def stop(self):
self._stop.set()
try:
if self.ser and self.ser.is_open:
self.ser.close()
except Exception:
pass
def run(self):
log.info(f"SerialReader starting: {self.port} @ {self.baud}")
while not self._stop.is_set():
try:
self.ser = serial.Serial(self.port, self.baud, timeout=1.0)
log.info("Serial opened")
break
except Exception as e:
log.warning(f"Failed to open serial {self.port}: {e}. Retrying in 2s")
time.sleep(2.0)
if self.ser is None:
return
try:
while not self._stop.is_set():
data = self.ser.read(self.chunk)
if data and self._loop and bytes_queue:
try:
self._loop.call_soon_threadsafe(bytes_queue.put_nowait, data)
except Exception as e:
log.warning(f"queue put failed: {e}")
elif not data:
time.sleep(0.001)
except Exception as e:
log.exception("SerialReader error")
finally:
try:
if self.ser:
self.ser.close()
except Exception:
pass
log.info("SerialReader stopped")
# ---------- Helper: collect exactly n bytes ----------
async def get_n_bytes(n: int, timeout: float = DEFAULT_TIMEOUT) -> bytes:
global remaining_bytes
if n <= 0:
return b""
if not bytes_queue or not remaining_lock:
raise HTTPException(status_code=503, detail="Service not ready")
async with remaining_lock:
out = bytearray()
deadline = asyncio.get_event_loop().time() + timeout
# 먼저 남은 바이트 확인
if remaining_bytes:
if len(remaining_bytes) >= n:
out.extend(remaining_bytes[:n])
remaining_bytes = remaining_bytes[n:]
return bytes(out)
else:
out.extend(remaining_bytes)
remaining_bytes.clear()
# 추가 바이트를 큐에서 가져오기
while len(out) < n:
try:
remaining_time = deadline - asyncio.get_event_loop().time()
if remaining_time <= 0:
raise asyncio.TimeoutError()
chunk: bytes = await asyncio.wait_for(bytes_queue.get(), timeout=remaining_time)
need = n - len(out)
if len(chunk) <= need:
out.extend(chunk)
else:
out.extend(chunk[:need])
remaining_bytes.extend(chunk[need:])
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail=f"Timeout while waiting for {n} bytes")
return bytes(out)
# ---------- API endpoints ----------
@app.get("/health")
async def health():
if not bytes_queue:
return {"status": "not_ready", "error": "Queue not initialized"}
qsize = bytes_queue.qsize()
remaining_count = len(remaining_bytes)
return {
"status": "ok",
"queue_size": qsize,
"remaining_bytes": remaining_count,
"serial_port": SERIAL_PORT,
"baud": SERIAL_BAUD
}
@app.get("/random")
async def random_bytes(n: int = 16, fmt: str = "hex", timeout: float = DEFAULT_TIMEOUT):
if n <= 0 or n > 10_000_000:
raise HTTPException(status_code=400, detail="n out of range")
data = await get_n_bytes(n, timeout=timeout)
if fmt == "hex":
return PlainTextResponse(data.hex())
elif fmt == "b64":
return PlainTextResponse(base64.b64encode(data).decode())
elif fmt == "raw":
return Response(content=data, media_type="application/octet-stream")
else:
raise HTTPException(status_code=400, detail="unknown fmt")
async def sse_generator() -> AsyncGenerator[str, None]:
if not bytes_queue:
return
try:
while True:
chunk: bytes = await bytes_queue.get()
payload = base64.b64encode(chunk).decode()
yield f"data: {payload}\n\n"
except asyncio.CancelledError:
log.info("SSE client disconnected")
return
@app.get("/stream")
async def stream(request: Request):
return StreamingResponse(sse_generator(), media_type="text/event-stream")
# ---------- Startup / shutdown handlers ----------
@app.on_event("startup")
async def on_startup():
global bytes_queue, remaining_lock, serial_reader
# 현재 실행 중인 이벤트 루프에서 큐와 락 초기화
bytes_queue = asyncio.Queue(maxsize=QUEUE_MAX)
remaining_lock = asyncio.Lock()
# SerialReader 시작
serial_reader = SerialReader(SERIAL_PORT, SERIAL_BAUD, chunk=READ_CHUNK)
serial_reader.set_loop(asyncio.get_event_loop())
serial_reader.start()
log.info("Serial reader thread launched")
@app.on_event("shutdown")
async def on_shutdown():
if serial_reader:
serial_reader.stop()
log.info("Shutdown complete")
def _signal_handler(sig, frame):
log.info("Signal received, shutting down")
if serial_reader:
serial_reader.stop()
raise SystemExit(0)
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
#include <Arduino.h>
#include <avr/wdt.h>
#include <avr/interrupt.h>
#include <SoftwareSerial.h>
// SoftwareSerial for Raspberry Pi communication
// Pin 6: RX, Pin 7: TX
SoftwareSerial rasp(6, 7);
// 버퍼 크기
#define BUFFER_SIZE 32
// 샘플 저장
volatile uint8_t trng_sample = 0;
volatile bool trng_sample_ready = false;
// xorshift 상태 (whitening)
static uint32_t trng_state = 0xA5A5A5A5UL;
// 출력 버퍼
uint8_t output_buffer[BUFFER_SIZE];
// 통계 카운터
unsigned long bytes_sent = 0;
unsigned long last_report_time = 0;
// Timer1 초기화: prescaler 1로 16MHz에서 직접 카운트
void trng_init_timer1() {
// Timer1을 normal mode, prescaler=1로 설정
TCCR1A = 0; // Normal mode
TCCR1B = (1 << CS10); // prescaler = 1 (16MHz 직접 카운트)
TCNT1 = 0; // 카운터 초기화
}
// WDT 초기화: 16ms 인터럽트 모드
void trng_init_wdt() {
cli(); // 인터럽트 비활성화
wdt_reset();
MCUSR &= ~(1 << WDRF);
// WDT 설정: 16ms 인터럽트 (WDP 비트 모두 0)
WDTCSR |= (1 << WDCE) | (1 << WDE);
WDTCSR = (1 << WDIE); // WDIE만 설정 -> 16ms 인터럽트
sei();
}
// WDT 인터럽트: Timer1 low byte 샘플링
ISR(WDT_vect) {
trng_sample = TCNT1L;
trng_sample_ready = true;
}
// 8번 샘플을 모아 1바이트 생성 + whitening
uint8_t trng_get_byte_blocking() {
uint8_t out = 0;
for (uint8_t i = 0; i < 8; ++i) {
while (!trng_sample_ready) {
// WDT 인터럽트 대기
}
uint8_t s = trng_sample;
trng_sample_ready = false;
// LSB 추출
uint8_t bit = s & 0x01;
out = (out << 1) | bit;
}
// xorshift32 whitening
trng_state ^= (uint32_t)out;
trng_state ^= (trng_state << 13);
trng_state ^= (trng_state >> 17);
trng_state ^= (trng_state << 5);
return (uint8_t)(trng_state & 0xFF);
}
void setup() {
// 하드웨어 시리얼 (디버깅용)
Serial.begin(9600);
Serial.println(F("TRNG 초기화 시작"));
// SoftwareSerial (라즈베리파이 통신용) - 9600 baud
rasp.begin(9600);
delay(100);
// Timer1 초기화 (반드시 WDT 이전에)
trng_init_timer1();
Serial.println(F("Timer1 시작 (prescaler=1)"));
// WDT 초기화 (16ms 인터럽트)
trng_init_wdt();
Serial.println(F("WDT 16ms 인터럽트 시작"));
Serial.println(F("TRNG 준비 완료"));
last_report_time = millis();
}
void loop() {
// 버퍼 채우기
for (int i = 0; i < BUFFER_SIZE; i++) {
output_buffer[i] = trng_get_byte_blocking();
}
// 라즈베리파이로 전송
rasp.write(output_buffer, BUFFER_SIZE);
bytes_sent += BUFFER_SIZE;
// 1초마다 통계 출력
unsigned long current_time = millis();
if (current_time - last_report_time >= 1000) {
Serial.print(F("전송: "));
Serial.print(bytes_sent);
Serial.print(F(" bytes/s, 샘플: "));
for (int i = 0; i < 4; i++) {
if (output_buffer[i] < 16) Serial.print('0');
Serial.print(output_buffer[i], HEX);
Serial.print(' ');
}
Serial.println();
last_report_time = current_time;
bytes_sent = 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment