Skip to content

Instantly share code, notes, and snippets.

@dfee
Created September 21, 2018 23:00
Show Gist options
  • Save dfee/02264e4e0ea8f6f38ffbeb5e03356473 to your computer and use it in GitHub Desktop.
Save dfee/02264e4e0ea8f6f38ffbeb5e03356473 to your computer and use it in GitHub Desktop.
import asyncio
import enum
import inspect
import typing
import graphql
import pytest
class MutationEnum(enum.Enum):
CREATED = "created"
UPDATED = "updated"
DELETED = "deleted"
class User(typing.NamedTuple):
firstName: str
lastName: str
id: typing.Optional[str] = None
class UserRepo:
def __init__(self, **users):
self.registry = users
def _get(self, id_):
return self.registry.get(id_)
async def get(self, id_):
await asyncio.sleep(.01) # simulate async db
return self._get(id_)
def _create(self, **kwargs):
id_ = str(len(self.registry))
self.registry[id_] = User(id=id_, **kwargs)
return self.registry[id_]
async def create(self, **kwargs):
await asyncio.sleep(.01) # simulate async db
return self._create(**kwargs)
def _update(self, **kwargs):
id_ = kwargs.pop("id")
self.registry[id_] = self.registry[id_]._replace(**kwargs)
return self.registry[id_]
async def update(self, **kwargs):
await asyncio.sleep(.01) # simulate async db
return self._update(**kwargs)
def _delete(self, id_):
return self.registry.pop(id_)
async def delete(self, id_):
await asyncio.sleep(.01) # simulate async db
self._delete(id_)
mutation_type = graphql.GraphQLEnumType("MutationType", MutationEnum)
user_type = graphql.GraphQLObjectType(
"UserType",
{
"id": graphql.GraphQLField(graphql.GraphQLNonNull(graphql.GraphQLID)),
"firstName": graphql.GraphQLField(
graphql.GraphQLNonNull(graphql.GraphQLString)
),
"lastName": graphql.GraphQLField(
graphql.GraphQLNonNull(graphql.GraphQLString)
),
},
)
create_user_type = graphql.GraphQLObjectType(
"CreateUserType", {"User": graphql.GraphQLField(user_type)}
)
update_user_type = graphql.GraphQLObjectType(
"UpdateUserType", {"User": graphql.GraphQLField(user_type)}
)
subscription_user_type = graphql.GraphQLObjectType(
"SubscriptionUserType",
{
"mutation": graphql.GraphQLField(mutation_type),
"node": graphql.GraphQLField(user_type),
},
)
async def resolve_user(root, info, **args):
return await info.context["user_repo"].get(args["id"])
async def resolve_create_user(root, info, **args):
user = await info.context["user_repo"].create(**args)
payload = {"id": user.id, "mutation": MutationEnum.CREATED.value}
info.context["event_emitter"].emit(make_pubsub_name(User), payload)
info.context["event_emitter"].emit(make_pubsub_name(user), payload)
return {"User": user}
async def resolve_delete_user(root, info, **args):
user = await info.context["user_repo"].get(args["id"])
await info.context["user_repo"].delete(user.id)
payload = {"id": user.id, "mutation": MutationEnum.DELETED.value}
info.context["event_emitter"].emit(make_pubsub_name(User), payload)
info.context["event_emitter"].emit(make_pubsub_name(user), payload)
return True
async def resolve_update_user(root, info, **args):
user = await info.context["user_repo"].update(**args)
payload = {"id": user.id, "mutation": MutationEnum.UPDATED.value}
info.context["event_emitter"].emit(make_pubsub_name(User), payload)
info.context["event_emitter"].emit(make_pubsub_name(user), payload)
return {"User": user}
def resolve_subscription_user(root, info, **args):
# Look below for a proper implementation
user = info.context["user_repo"]._get(args["id"])
return {"mutation": MutationEnum(root["mutation"]).value, "node": user}
# async def resolve_subscription_user(root, info, **args):
# user = await info.context["user_repo"].get(args["id"])
# return {"mutation": MutationEnum(root["mutation"]).value, "node": user}
def make_pubsub_name(obj):
if inspect.isclass(obj):
return obj.__name__
return f"{type(obj).__name__}__{obj.id}"
async def subscribe_user(root, info, **args):
user = await info.context["user_repo"].get(args["id"])
ee = info.context["event_emitter"]
name = make_pubsub_name(user)
async for msg in graphql.pyutils.EventEmitterAsyncIterator(ee, name):
yield msg
schema = graphql.GraphQLSchema(
query=graphql.GraphQLObjectType(
"RootQueryType",
{
"User": graphql.GraphQLField(
user_type, args={"id": graphql.GraphQLID}, resolve=resolve_user
)
},
),
mutation=graphql.GraphQLObjectType(
"RootMutationType",
{
"CreateUser": graphql.GraphQLField(
create_user_type,
args={
"firstName": graphql.GraphQLString,
"lastName": graphql.GraphQLString,
},
resolve=resolve_create_user,
),
"DeleteUser": graphql.GraphQLField(
graphql.GraphQLBoolean,
args={"id": graphql.GraphQLNonNull(graphql.GraphQLID)},
resolve=resolve_delete_user,
),
"UpdateUser": graphql.GraphQLField(
update_user_type,
args={
"id": graphql.GraphQLNonNull(graphql.GraphQLID),
"firstName": graphql.GraphQLString,
"lastName": graphql.GraphQLString,
},
resolve=resolve_update_user,
),
},
),
subscription=graphql.GraphQLObjectType(
"RootSubscriptionType",
{
"User": graphql.GraphQLField(
subscription_user_type,
args={"id": graphql.GraphQLID},
subscribe=subscribe_user,
resolve=resolve_subscription_user,
)
},
),
)
@pytest.fixture
def context():
return {
"event_emitter": graphql.pyutils.EventEmitter(),
"user_repo": UserRepo(),
}
@pytest.mark.asyncio
async def test_query_user(context):
user = await context["user_repo"].create(firstName="bill", lastName="gates")
query = """
query ($userId: ID!) {
User(id: $userId) {
id, firstName, lastName
}
}
"""
result = await graphql.graphql(
schema,
query,
context_value=context,
variable_values={"userId": user.id},
)
assert not result.errors
assert result.data == {
"User": {
"id": user.id,
"firstName": user.firstName,
"lastName": user.lastName,
}
}
@pytest.mark.asyncio
async def test_create_user(context):
received = {}
def update_received(event_name):
def _update(msg):
received[event_name] = msg
return _update
context["event_emitter"].add_listener("User", update_received("User"))
context["event_emitter"].add_listener("User__0", update_received("User__0"))
query = """
mutation ($firstName: String!, $lastName: String!) {
CreateUser(
firstName: $firstName,
lastName: $lastName,
) {
User { id, firstName, lastName }
}
}"""
variables = {"firstName": "bill", "lastName": "gates"}
result = await graphql.graphql(
schema, query, context_value=context, variable_values=variables
)
user = await context["user_repo"].get("0")
assert not result.errors
assert result.data == {
"CreateUser": {
"User": {
"id": user.id,
"firstName": user.firstName,
"lastName": user.lastName,
}
}
}
assert received == {
"User": {"id": "0", "mutation": MutationEnum.CREATED.value},
"User__0": {"id": "0", "mutation": MutationEnum.CREATED.value},
}
@pytest.mark.asyncio
async def test_delete_user(context):
received = {}
def update_received(event_name):
def _update(msg):
received[event_name] = msg
return _update
context["event_emitter"].add_listener("User", update_received("User"))
context["event_emitter"].add_listener("User__0", update_received("User__0"))
user = await context["user_repo"].create(firstName="bill", lastName="gates")
query = """
mutation ($id: ID!) {
DeleteUser(
id: $id,
)
}"""
variables = {"id": user.id}
result = await graphql.graphql(
schema, query, context_value=context, variable_values=variables
)
assert not await context["user_repo"].get(user.id)
assert not result.errors
assert result.data == {"DeleteUser": True}
assert received == {
"User": {"id": "0", "mutation": MutationEnum.DELETED.value},
"User__0": {"id": "0", "mutation": MutationEnum.DELETED.value},
}
@pytest.mark.asyncio
async def test_update_user(context):
received = {}
def update_received(event_name):
def _update(msg):
received[event_name] = msg
return _update
context["event_emitter"].add_listener("User", update_received("User"))
context["event_emitter"].add_listener("User__0", update_received("User__0"))
user = await context["user_repo"].create(firstName="bill", lastName="gates")
query = """
mutation ($id: ID!, $firstName: String, $lastName: String) {
UpdateUser(
id: $id,
firstName: $firstName,
lastName: $lastName,
) {
User { id, firstName, lastName }
}
}"""
variables = {"id": user.id, "firstName": "willy", "lastName": "g"}
result = await graphql.graphql(
schema, query, context_value=context, variable_values=variables
)
updated_user = await context["user_repo"].get("0")
assert updated_user.id == user.id
assert not result.errors
assert result.data == {
"UpdateUser": {
"User": {
"id": updated_user.id,
"firstName": updated_user.firstName,
"lastName": updated_user.lastName,
}
}
}
assert received == {
"User": {"id": "0", "mutation": MutationEnum.UPDATED.value},
"User__0": {"id": "0", "mutation": MutationEnum.UPDATED.value},
}
@pytest.mark.asyncio
async def test_subscribe_user(context):
user = await context["user_repo"].create(firstName="bill", lastName="gates")
query = """
subscription ($id: ID!) {
User(id: $id) {
mutation
node { id, firstName, lastName }
}
}
"""
variables = {"id": user.id}
sub = await graphql.subscribe(
schema,
graphql.parse(query),
context_value=context,
variable_values=variables,
)
assert isinstance(
sub, graphql.subscription.map_async_iterator.MapAsyncIterator
)
fut = asyncio.ensure_future(sub.__anext__())
await asyncio.sleep(.1)
context["event_emitter"].emit(
make_pubsub_name(user),
{"id": user.id, "mutation": MutationEnum.UPDATED.value},
)
await asyncio.sleep(.01)
assert fut.done()
result = fut.result()
assert not result.errors
assert result.data == {
"User": {
"mutation": MutationEnum.UPDATED.name,
"node": {"id": "0", "firstName": "bill", "lastName": "gates"},
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment