Skip to content

Instantly share code, notes, and snippets.

@hvardhanx
Created October 28, 2017 16:37
Show Gist options
  • Save hvardhanx/1f5082596548f0262804e3cef5f2dc88 to your computer and use it in GitHub Desktop.
Save hvardhanx/1f5082596548f0262804e3cef5f2dc88 to your computer and use it in GitHub Desktop.
Generated by the gRPC Python protocol compiler plugin. Some edits were made by hand to make it work with flatbuffers. (:
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import flatbuffers
import MyGame.Sample.Monster as Monster
import MyGame.Sample.Weapon as Weapon
builder = flatbuffers.Builder(0)
class MonsterStorageStub(object):
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Store = channel.unary_unary(
'/MyGame.Sample.MonsterStorage/Store',
request_serializer=builder.SerializeToString,
response_deserializer=builder.FromString,
)
self.Retrieve = channel.unary_unary(
'/MyGame.Sample.MonsterStorage/Retrieve',
request_serializer=builder.SerializeToString,
response_deserializer=builder.FromString,
)
class MonsterStorageServicer(object):
def Store(self, request, context):
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Retrieve(self, request, context):
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_MonsterStorageServicer_to_server(servicer, server):
rpc_method_handlers = {
'Store': grpc.unary_unary_rpc_method_handler(
servicer.Store,
request_deserializer=builder.FromString,
response_serializer=builder.SerializeToString,
),
'Retrieve': grpc.unary_unary_rpc_method_handler(
servicer.Retrieve,
request_deserializer=builder.FromString,
response_serializer=builder.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'MyGame.Sample.MonsterStorage', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
try:
# THESE ELEMENTS WILL BE DEPRECATED.
# Please use the generated *_pb2_grpc.py files instead.
import grpc
import flatbuffers
import MyGame.Sample.Monster as Monster
import MyGame.Sample.Weapon as Weapon
from grpc.beta import implementations as beta_implementations
from grpc.beta import interfaces as beta_interfaces
from grpc.framework.common import cardinality
from grpc.framework.interfaces.face import utilities as face_utilities
class MonsterStorageStub(object):
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Store = channel.unary_unary(
'/MyGame.Sample.MonsterStorage/Store',
request_serializer=builder.SerializeToString,
response_deserializer=builder.FromString,
)
self.Retrieve = channel.unary_unary(
'/MyGame.Sample.MonsterStorage/Retrieve',
request_serializer=builder.SerializeToString,
response_deserializer=builder.FromString,
)
class MonsterStorageServicer(object):
def Store(self, request, context):
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Retrieve(self, request, context):
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_MonsterStorageServicer_to_server(servicer, server):
rpc_method_handlers = {
'Store': grpc.unary_unary_rpc_method_handler(
servicer.Store,
request_deserializer=builder.FromString,
response_serializer=builder.SerializeToString,
),
'Retrieve': grpc.unary_unary_rpc_method_handler(
servicer.Retrieve,
request_deserializer=builder.FromString,
response_serializer=builder.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'MyGame.Sample.MonsterStorage', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
class BetaMonsterStorageServicer(object):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This class was generated
only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0."""
def Store(self, request, context):
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def Retrieve(self, request, context):
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
class BetaMonsterStorageStub(object):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This class was generated
only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0."""
def Store(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
raise NotImplementedError()
Store.future = None
def Retrieve(self, request, timeout, metadata=None, with_call=False, protocol_options=None):
raise NotImplementedError()
Retrieve.future = None
def beta_create_MonsterStorage_server(servicer, pool=None, pool_size=None, default_timeout=None, maximum_timeout=None):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This function was
generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0"""
request_deserializers = {
('MyGame.Sample.MonsterStorage', 'Retrieve'): builder.FromString,
('MyGame.Sample.MonsterStorage', 'Store'): builder.FromString,
}
response_serializers = {
('MyGame.Sample.MonsterStorage', 'Retrieve'): builder.SerializeToString,
('MyGame.Sample.MonsterStorage', 'Store'): builder.SerializeToString,
}
method_implementations = {
('MyGame.Sample.MonsterStorage', 'Retrieve'): face_utilities.unary_unary_inline(servicer.Retrieve),
('MyGame.Sample.MonsterStorage', 'Store'): face_utilities.unary_unary_inline(servicer.Store),
}
server_options = beta_implementations.server_options(request_deserializers=request_deserializers, response_serializers=response_serializers, thread_pool=pool, thread_pool_size=pool_size, default_timeout=default_timeout, maximum_timeout=maximum_timeout)
return beta_implementations.server(method_implementations, options=server_options)
def beta_create_MonsterStorage_stub(channel, host=None, metadata_transformer=None, pool=None, pool_size=None):
"""The Beta API is deprecated for 0.15.0 and later.
It is recommended to use the GA API (classes and functions in this
file not marked beta) for all further purposes. This function was
generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0"""
request_serializers = {
('MyGame.Sample.MonsterStorage', 'Retrieve'): builder.SerializeToString,
('MyGame.Sample.MonsterStorage', 'Store'): builder.SerializeToString,
}
response_deserializers = {
('MyGame.Sample.MonsterStorage', 'Retrieve'): builder.FromString,
('MyGame.Sample.MonsterStorage', 'Store'): builder.FromString,
}
cardinalities = {
'Retrieve': cardinality.Cardinality.UNARY_UNARY,
'Store': cardinality.Cardinality.UNARY_UNARY,
}
stub_options = beta_implementations.stub_options(host=host, metadata_transformer=metadata_transformer, request_serializers=request_serializers, response_deserializers=response_deserializers, thread_pool=pool, thread_pool_size=pool_size)
return beta_implementations.dynamic_stub(channel, 'MyGame.Sample.MonsterStorage', cardinalities, options=stub_options)
except ImportError:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment