Skip to content

Instantly share code, notes, and snippets.

@justdoit0823
Created July 6, 2017 12:15
Show Gist options
  • Save justdoit0823/0a205f3abe2699904e6d57d6f55d1727 to your computer and use it in GitHub Desktop.
Save justdoit0823/0a205f3abe2699904e6d57d6f55d1727 to your computer and use it in GitHub Desktop.
A simple grpc test fixture in pytest.
from concurrent.futures import ThreadPoolExecutor
import grpc
import pytest
import a_pb2_grpc
from rpc.server import AGRpcServer, BGRpcServer
import b_pb2_grpc
_GRPC_TEST_ADDR = '127.0.0.1:50012'
@pytest.fixture(scope='module')
def grpc_server():
server = grpc.server(ThreadPoolExecutor(max_workers=2))
a_pb2_grpc.add_AServicer_to_server(AGRpcServer(), server)
b_pb2_grpc.add_BServicer_to_server(BGRpcServer(), server)
server.add_insecure_port(_GRPC_TEST_ADDR)
server.start()
yield server
server.stop(0)
@pytest.fixture(scope='module')
def grpc_channel():
channel = grpc.insecure_channel(_GRPC_TEST_ADDR)
return channel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment