Skip to content

Instantly share code, notes, and snippets.

@igalshilman
Created June 23, 2021 08:26
Show Gist options
  • Save igalshilman/40dcb0890c7cece76976227d8cbe6742 to your computer and use it in GitHub Desktop.
Save igalshilman/40dcb0890c7cece76976227d8cbe6742 to your computer and use it in GitHub Desktop.
Smoke Python Test
=================
from datetime import timedelta
from statefun import *
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from statefun import *
import asyncio
from commands_pb2 import *
from statefun.messages import egress_message_builder
from target.statefun.request_reply_pb2 import TypedValue
functions = StatefulFunctions()
SourceCommandType = make_protobuf_type(SourceCommand)
CommandsType = make_protobuf_type(Commands)
VerificationResultType = make_protobuf_type(VerificationResult)
def increment(context, _):
n = context.storage.state or 0
n += 1
context.storage.state = n
def send(context, cmd):
context.send(message_builder(target_typename="v/f1", target_id=f"{cmd.target}", value=cmd.commands,
value_type=CommandsType))
def send_after(context, cmd):
context.send_after(duration=timedelta(seconds=0.001),
message=message_builder(
target_typename="v/f1",
target_id=str(cmd.target),
value=cmd.commands,
value_type=CommandsType))
def send_egress(context: Context, _):
m = EgressMessage(typename="v/sink", typed_value=TypedValue())
context.send_egress(m)
def verify(context, command):
me = int(context.address.id)
actual = context.storage.state or 0
expected = command.expected
result = VerificationResult()
result.id = me
result.expected = expected
result.actual = actual
r = egress_message_builder("v/verification", result, value_type=VerificationResultType)
context.send_egress(r)
COMMANDS = {"send_egress": send_egress, "send": send, "send_after": send_after, "verify": verify}
def interpret(context, commands: Commands):
for command in commands.command:
clause = command.WhichOneof("command")
sub_command = getattr(command, clause)
fn = command[clause]
fn(context, sub_command)
@functions.bind(
typename="v/f1",
specs=[ValueSpec(name="state", type=LongType)])
async def fn(context, message):
if message.is_type(SourceCommandType):
src = message.as_type(SourceCommandType)
commands = src.commands
elif message.is_type(CommandsType):
commands = message.as_type(CommandsType)
else:
raise ValueError('unknown ' + message.target_typename)
interpret(context, commands)
#
# Serve the endpoint
#
from aiohttp import web
handler = RequestReplyHandler(functions)
async def handle(request):
req = await request.read()
res = await handler.handle_async(req)
return web.Response(body=res, content_type="application/octet-stream")
app = web.Application()
app.add_routes([web.post('/statefun', handle)])
if __name__ == '__main__':
web.run_app(app, port=8000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment