Last active
September 9, 2025 05:53
-
-
Save pgmrDohan/b0a50006f32c7135a4c7f9d6a34fe3d7 to your computer and use it in GitHub Desktop.
TRNG API Using Arduino Uno & FastAPI on Raspberry Pi
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import base64 | |
| import logging | |
| import os | |
| import signal | |
| import threading | |
| import time | |
| from typing import AsyncGenerator, Optional | |
| import serial | |
| from fastapi import FastAPI, HTTPException, Request, Response | |
| from fastapi.responses import StreamingResponse, PlainTextResponse | |
| # ---------- Config ---------- | |
| SERIAL_PORT = os.environ.get("TRNG_SERIAL_PORT", "/dev/serial0") | |
| SERIAL_BAUD = int(os.environ.get("TRNG_SERIAL_BAUD", "9600")) | |
| QUEUE_MAX = int(os.environ.get("TRNG_QUEUE_MAX", "65536")) | |
| READ_CHUNK = 32 | |
| DEFAULT_TIMEOUT = 5.0 | |
| # ---------- Logging ---------- | |
| logging.basicConfig(level=logging.INFO) | |
| log = logging.getLogger("trng-api") | |
| # ---------- App ---------- | |
| app = FastAPI(title="TRNG API", version="0.1") | |
| # ---------- Global variables ---------- | |
| bytes_queue: Optional[asyncio.Queue] = None | |
| remaining_bytes = bytearray() | |
| remaining_lock: Optional[asyncio.Lock] = None | |
| serial_reader: Optional['SerialReader'] = None | |
| # ---------- Serial reader thread ---------- | |
| class SerialReader(threading.Thread): | |
| def __init__(self, port: str, baud: int, chunk: int = 32): | |
| super().__init__(daemon=True) | |
| self.port = port | |
| self.baud = baud | |
| self.chunk = chunk | |
| self._stop = threading.Event() | |
| self.ser: Optional[serial.Serial] = None | |
| self._loop = None | |
| def set_loop(self, loop): | |
| self._loop = loop | |
| def stop(self): | |
| self._stop.set() | |
| try: | |
| if self.ser and self.ser.is_open: | |
| self.ser.close() | |
| except Exception: | |
| pass | |
| def run(self): | |
| log.info(f"SerialReader starting: {self.port} @ {self.baud}") | |
| while not self._stop.is_set(): | |
| try: | |
| self.ser = serial.Serial(self.port, self.baud, timeout=1.0) | |
| log.info("Serial opened") | |
| break | |
| except Exception as e: | |
| log.warning(f"Failed to open serial {self.port}: {e}. Retrying in 2s") | |
| time.sleep(2.0) | |
| if self.ser is None: | |
| return | |
| try: | |
| while not self._stop.is_set(): | |
| data = self.ser.read(self.chunk) | |
| if data and self._loop and bytes_queue: | |
| try: | |
| self._loop.call_soon_threadsafe(bytes_queue.put_nowait, data) | |
| except Exception as e: | |
| log.warning(f"queue put failed: {e}") | |
| elif not data: | |
| time.sleep(0.001) | |
| except Exception as e: | |
| log.exception("SerialReader error") | |
| finally: | |
| try: | |
| if self.ser: | |
| self.ser.close() | |
| except Exception: | |
| pass | |
| log.info("SerialReader stopped") | |
| # ---------- Helper: collect exactly n bytes ---------- | |
| async def get_n_bytes(n: int, timeout: float = DEFAULT_TIMEOUT) -> bytes: | |
| global remaining_bytes | |
| if n <= 0: | |
| return b"" | |
| if not bytes_queue or not remaining_lock: | |
| raise HTTPException(status_code=503, detail="Service not ready") | |
| async with remaining_lock: | |
| out = bytearray() | |
| deadline = asyncio.get_event_loop().time() + timeout | |
| # 먼저 남은 바이트 확인 | |
| if remaining_bytes: | |
| if len(remaining_bytes) >= n: | |
| out.extend(remaining_bytes[:n]) | |
| remaining_bytes = remaining_bytes[n:] | |
| return bytes(out) | |
| else: | |
| out.extend(remaining_bytes) | |
| remaining_bytes.clear() | |
| # 추가 바이트를 큐에서 가져오기 | |
| while len(out) < n: | |
| try: | |
| remaining_time = deadline - asyncio.get_event_loop().time() | |
| if remaining_time <= 0: | |
| raise asyncio.TimeoutError() | |
| chunk: bytes = await asyncio.wait_for(bytes_queue.get(), timeout=remaining_time) | |
| need = n - len(out) | |
| if len(chunk) <= need: | |
| out.extend(chunk) | |
| else: | |
| out.extend(chunk[:need]) | |
| remaining_bytes.extend(chunk[need:]) | |
| except asyncio.TimeoutError: | |
| raise HTTPException(status_code=504, detail=f"Timeout while waiting for {n} bytes") | |
| return bytes(out) | |
| # ---------- API endpoints ---------- | |
| @app.get("/health") | |
| async def health(): | |
| if not bytes_queue: | |
| return {"status": "not_ready", "error": "Queue not initialized"} | |
| qsize = bytes_queue.qsize() | |
| remaining_count = len(remaining_bytes) | |
| return { | |
| "status": "ok", | |
| "queue_size": qsize, | |
| "remaining_bytes": remaining_count, | |
| "serial_port": SERIAL_PORT, | |
| "baud": SERIAL_BAUD | |
| } | |
| @app.get("/random") | |
| async def random_bytes(n: int = 16, fmt: str = "hex", timeout: float = DEFAULT_TIMEOUT): | |
| if n <= 0 or n > 10_000_000: | |
| raise HTTPException(status_code=400, detail="n out of range") | |
| data = await get_n_bytes(n, timeout=timeout) | |
| if fmt == "hex": | |
| return PlainTextResponse(data.hex()) | |
| elif fmt == "b64": | |
| return PlainTextResponse(base64.b64encode(data).decode()) | |
| elif fmt == "raw": | |
| return Response(content=data, media_type="application/octet-stream") | |
| else: | |
| raise HTTPException(status_code=400, detail="unknown fmt") | |
| async def sse_generator() -> AsyncGenerator[str, None]: | |
| if not bytes_queue: | |
| return | |
| try: | |
| while True: | |
| chunk: bytes = await bytes_queue.get() | |
| payload = base64.b64encode(chunk).decode() | |
| yield f"data: {payload}\n\n" | |
| except asyncio.CancelledError: | |
| log.info("SSE client disconnected") | |
| return | |
| @app.get("/stream") | |
| async def stream(request: Request): | |
| return StreamingResponse(sse_generator(), media_type="text/event-stream") | |
| # ---------- Startup / shutdown handlers ---------- | |
| @app.on_event("startup") | |
| async def on_startup(): | |
| global bytes_queue, remaining_lock, serial_reader | |
| # 현재 실행 중인 이벤트 루프에서 큐와 락 초기화 | |
| bytes_queue = asyncio.Queue(maxsize=QUEUE_MAX) | |
| remaining_lock = asyncio.Lock() | |
| # SerialReader 시작 | |
| serial_reader = SerialReader(SERIAL_PORT, SERIAL_BAUD, chunk=READ_CHUNK) | |
| serial_reader.set_loop(asyncio.get_event_loop()) | |
| serial_reader.start() | |
| log.info("Serial reader thread launched") | |
| @app.on_event("shutdown") | |
| async def on_shutdown(): | |
| if serial_reader: | |
| serial_reader.stop() | |
| log.info("Shutdown complete") | |
| def _signal_handler(sig, frame): | |
| log.info("Signal received, shutting down") | |
| if serial_reader: | |
| serial_reader.stop() | |
| raise SystemExit(0) | |
| signal.signal(signal.SIGINT, _signal_handler) | |
| signal.signal(signal.SIGTERM, _signal_handler) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include <Arduino.h> | |
| #include <avr/wdt.h> | |
| #include <avr/interrupt.h> | |
| #include <SoftwareSerial.h> | |
| // SoftwareSerial for Raspberry Pi communication | |
| // Pin 6: RX, Pin 7: TX | |
| SoftwareSerial rasp(6, 7); | |
| // 버퍼 크기 | |
| #define BUFFER_SIZE 32 | |
| // 샘플 저장 | |
| volatile uint8_t trng_sample = 0; | |
| volatile bool trng_sample_ready = false; | |
| // xorshift 상태 (whitening) | |
| static uint32_t trng_state = 0xA5A5A5A5UL; | |
| // 출력 버퍼 | |
| uint8_t output_buffer[BUFFER_SIZE]; | |
| // 통계 카운터 | |
| unsigned long bytes_sent = 0; | |
| unsigned long last_report_time = 0; | |
| // Timer1 초기화: prescaler 1로 16MHz에서 직접 카운트 | |
| void trng_init_timer1() { | |
| // Timer1을 normal mode, prescaler=1로 설정 | |
| TCCR1A = 0; // Normal mode | |
| TCCR1B = (1 << CS10); // prescaler = 1 (16MHz 직접 카운트) | |
| TCNT1 = 0; // 카운터 초기화 | |
| } | |
| // WDT 초기화: 16ms 인터럽트 모드 | |
| void trng_init_wdt() { | |
| cli(); // 인터럽트 비활성화 | |
| wdt_reset(); | |
| MCUSR &= ~(1 << WDRF); | |
| // WDT 설정: 16ms 인터럽트 (WDP 비트 모두 0) | |
| WDTCSR |= (1 << WDCE) | (1 << WDE); | |
| WDTCSR = (1 << WDIE); // WDIE만 설정 -> 16ms 인터럽트 | |
| sei(); | |
| } | |
| // WDT 인터럽트: Timer1 low byte 샘플링 | |
| ISR(WDT_vect) { | |
| trng_sample = TCNT1L; | |
| trng_sample_ready = true; | |
| } | |
| // 8번 샘플을 모아 1바이트 생성 + whitening | |
| uint8_t trng_get_byte_blocking() { | |
| uint8_t out = 0; | |
| for (uint8_t i = 0; i < 8; ++i) { | |
| while (!trng_sample_ready) { | |
| // WDT 인터럽트 대기 | |
| } | |
| uint8_t s = trng_sample; | |
| trng_sample_ready = false; | |
| // LSB 추출 | |
| uint8_t bit = s & 0x01; | |
| out = (out << 1) | bit; | |
| } | |
| // xorshift32 whitening | |
| trng_state ^= (uint32_t)out; | |
| trng_state ^= (trng_state << 13); | |
| trng_state ^= (trng_state >> 17); | |
| trng_state ^= (trng_state << 5); | |
| return (uint8_t)(trng_state & 0xFF); | |
| } | |
| void setup() { | |
| // 하드웨어 시리얼 (디버깅용) | |
| Serial.begin(9600); | |
| Serial.println(F("TRNG 초기화 시작")); | |
| // SoftwareSerial (라즈베리파이 통신용) - 9600 baud | |
| rasp.begin(9600); | |
| delay(100); | |
| // Timer1 초기화 (반드시 WDT 이전에) | |
| trng_init_timer1(); | |
| Serial.println(F("Timer1 시작 (prescaler=1)")); | |
| // WDT 초기화 (16ms 인터럽트) | |
| trng_init_wdt(); | |
| Serial.println(F("WDT 16ms 인터럽트 시작")); | |
| Serial.println(F("TRNG 준비 완료")); | |
| last_report_time = millis(); | |
| } | |
| void loop() { | |
| // 버퍼 채우기 | |
| for (int i = 0; i < BUFFER_SIZE; i++) { | |
| output_buffer[i] = trng_get_byte_blocking(); | |
| } | |
| // 라즈베리파이로 전송 | |
| rasp.write(output_buffer, BUFFER_SIZE); | |
| bytes_sent += BUFFER_SIZE; | |
| // 1초마다 통계 출력 | |
| unsigned long current_time = millis(); | |
| if (current_time - last_report_time >= 1000) { | |
| Serial.print(F("전송: ")); | |
| Serial.print(bytes_sent); | |
| Serial.print(F(" bytes/s, 샘플: ")); | |
| for (int i = 0; i < 4; i++) { | |
| if (output_buffer[i] < 16) Serial.print('0'); | |
| Serial.print(output_buffer[i], HEX); | |
| Serial.print(' '); | |
| } | |
| Serial.println(); | |
| last_report_time = current_time; | |
| bytes_sent = 0; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment