Last active
November 21, 2020 08:32
-
-
Save w4rum/1f749149cafeec11c863bd625e62a5e9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
syntax = "proto3"; | |
package example_service; | |
service ExampleService { | |
rpc ExampleUnaryUnary(ExampleRequest) returns (ExampleResponse); | |
rpc ExampleUnaryStream(ExampleRequest) returns (stream ExampleResponse); | |
rpc ExampleStreamUnary(stream ExampleRequest) returns (ExampleResponse); | |
rpc ExampleStreamStream(stream ExampleRequest) returns (stream ExampleResponse); | |
} | |
message ExampleRequest { | |
string example_string = 1; | |
int64 example_integer = 2; | |
} | |
message ExampleResponse { | |
string example_string = 1; | |
int64 example_integer = 2; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Generated by the protocol buffer compiler. DO NOT EDIT! | |
# sources: example_service.proto | |
# plugin: python-betterproto | |
from dataclasses import dataclass | |
from typing import AsyncIterable, AsyncIterator, Dict, Iterable, Union | |
import betterproto | |
import grpclib | |
@dataclass(eq=False, repr=False) | |
class ExampleRequest(betterproto.Message): | |
example_string: str = betterproto.string_field(1) | |
example_integer: int = betterproto.int64_field(2) | |
def __post_init__(self) -> None: | |
super().__post_init__() | |
@dataclass(eq=False, repr=False) | |
class ExampleResponse(betterproto.Message): | |
example_string: str = betterproto.string_field(1) | |
example_integer: int = betterproto.int64_field(2) | |
def __post_init__(self) -> None: | |
super().__post_init__() | |
class ExampleServiceStub(betterproto.ServiceStub): | |
async def example_unary_unary( | |
self, *, example_string: str = "", example_integer: int = 0 | |
) -> "ExampleResponse": | |
request = ExampleRequest() | |
request.example_string = example_string | |
request.example_integer = example_integer | |
return await self._unary_unary( | |
"/example_service.ExampleService/ExampleUnaryUnary", | |
request, | |
ExampleResponse, | |
) | |
async def example_unary_stream( | |
self, *, example_string: str = "", example_integer: int = 0 | |
) -> AsyncIterator["ExampleResponse"]: | |
request = ExampleRequest() | |
request.example_string = example_string | |
request.example_integer = example_integer | |
async for response in self._unary_stream( | |
"/example_service.ExampleService/ExampleUnaryStream", | |
request, | |
ExampleResponse, | |
): | |
yield response | |
async def example_stream_unary( | |
self, | |
request_iterator: Union[ | |
AsyncIterable["ExampleRequest"], Iterable["ExampleRequest"] | |
], | |
) -> "ExampleResponse": | |
return await self._stream_unary( | |
"/example_service.ExampleService/ExampleStreamUnary", | |
request_iterator, | |
ExampleRequest, | |
ExampleResponse, | |
) | |
async def example_stream_stream( | |
self, | |
request_iterator: Union[ | |
AsyncIterable["ExampleRequest"], Iterable["ExampleRequest"] | |
], | |
) -> AsyncIterator["ExampleResponse"]: | |
async for response in self._stream_stream( | |
"/example_service.ExampleService/ExampleStreamStream", | |
request_iterator, | |
ExampleRequest, | |
ExampleResponse, | |
): | |
yield response | |
class ExampleServiceImplementation(betterproto.ServiceImplementation): | |
async def example_unary_unary( | |
self, example_string: str, example_integer: int | |
) -> "ExampleResponse": | |
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) | |
async def example_unary_stream( | |
self, example_string: str, example_integer: int | |
) -> AsyncIterator["ExampleResponse"]: | |
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) | |
async def example_stream_unary( | |
self, example_request_iterator: AsyncIterable["ExampleRequest"] | |
) -> "ExampleResponse": | |
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) | |
async def example_stream_stream( | |
self, example_request_iterator: AsyncIterable["ExampleRequest"] | |
) -> AsyncIterator["ExampleResponse"]: | |
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) | |
async def __rpc_example_unary_unary(self, stream): | |
request = await stream.recv_message() | |
request_kwargs = { | |
"example_string": request.example_string, | |
"example_integer": request.example_integer, | |
} | |
await self._call_rpc_handler_server_unary( | |
self.example_unary_unary, | |
stream, | |
request_kwargs, | |
) | |
async def __rpc_example_unary_stream(self, stream): | |
request = await stream.recv_message() | |
request_kwargs = { | |
"example_string": request.example_string, | |
"example_integer": request.example_integer, | |
} | |
await self._call_rpc_handler_server_stream( | |
self.example_unary_stream, | |
stream, | |
request_kwargs, | |
) | |
async def __rpc_example_stream_unary(self, stream): | |
request_kwargs = {"example_request_iterator": stream.__aiter__()} | |
await self._call_rpc_handler_server_unary( | |
self.example_stream_unary, | |
stream, | |
request_kwargs, | |
) | |
async def __rpc_example_stream_stream(self, stream): | |
request_kwargs = {"example_request_iterator": stream.__aiter__()} | |
await self._call_rpc_handler_server_stream( | |
self.example_stream_stream, | |
stream, | |
request_kwargs, | |
) | |
def __mapping__(self) -> Dict[str, grpclib.const.Handler]: | |
return { | |
"/example_service.ExampleService/ExampleUnaryUnary": grpclib.const.Handler( | |
self.__rpc_example_unary_unary, | |
grpclib.const.Cardinality.UNARY_UNARY, | |
ExampleRequest, | |
ExampleResponse, | |
), | |
"/example_service.ExampleService/ExampleUnaryStream": grpclib.const.Handler( | |
self.__rpc_example_unary_stream, | |
grpclib.const.Cardinality.UNARY_STREAM, | |
ExampleRequest, | |
ExampleResponse, | |
), | |
"/example_service.ExampleService/ExampleStreamUnary": grpclib.const.Handler( | |
self.__rpc_example_stream_unary, | |
grpclib.const.Cardinality.STREAM_UNARY, | |
ExampleRequest, | |
ExampleResponse, | |
), | |
"/example_service.ExampleService/ExampleStreamStream": grpclib.const.Handler( | |
self.__rpc_example_stream_stream, | |
grpclib.const.Cardinality.STREAM_STREAM, | |
ExampleRequest, | |
ExampleResponse, | |
), | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment