Created
September 21, 2018 23:00
-
-
Save dfee/02264e4e0ea8f6f38ffbeb5e03356473 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import enum | |
import inspect | |
import typing | |
import graphql | |
import pytest | |
class MutationEnum(enum.Enum): | |
CREATED = "created" | |
UPDATED = "updated" | |
DELETED = "deleted" | |
class User(typing.NamedTuple): | |
firstName: str | |
lastName: str | |
id: typing.Optional[str] = None | |
class UserRepo: | |
def __init__(self, **users): | |
self.registry = users | |
def _get(self, id_): | |
return self.registry.get(id_) | |
async def get(self, id_): | |
await asyncio.sleep(.01) # simulate async db | |
return self._get(id_) | |
def _create(self, **kwargs): | |
id_ = str(len(self.registry)) | |
self.registry[id_] = User(id=id_, **kwargs) | |
return self.registry[id_] | |
async def create(self, **kwargs): | |
await asyncio.sleep(.01) # simulate async db | |
return self._create(**kwargs) | |
def _update(self, **kwargs): | |
id_ = kwargs.pop("id") | |
self.registry[id_] = self.registry[id_]._replace(**kwargs) | |
return self.registry[id_] | |
async def update(self, **kwargs): | |
await asyncio.sleep(.01) # simulate async db | |
return self._update(**kwargs) | |
def _delete(self, id_): | |
return self.registry.pop(id_) | |
async def delete(self, id_): | |
await asyncio.sleep(.01) # simulate async db | |
self._delete(id_) | |
mutation_type = graphql.GraphQLEnumType("MutationType", MutationEnum) | |
user_type = graphql.GraphQLObjectType( | |
"UserType", | |
{ | |
"id": graphql.GraphQLField(graphql.GraphQLNonNull(graphql.GraphQLID)), | |
"firstName": graphql.GraphQLField( | |
graphql.GraphQLNonNull(graphql.GraphQLString) | |
), | |
"lastName": graphql.GraphQLField( | |
graphql.GraphQLNonNull(graphql.GraphQLString) | |
), | |
}, | |
) | |
create_user_type = graphql.GraphQLObjectType( | |
"CreateUserType", {"User": graphql.GraphQLField(user_type)} | |
) | |
update_user_type = graphql.GraphQLObjectType( | |
"UpdateUserType", {"User": graphql.GraphQLField(user_type)} | |
) | |
subscription_user_type = graphql.GraphQLObjectType( | |
"SubscriptionUserType", | |
{ | |
"mutation": graphql.GraphQLField(mutation_type), | |
"node": graphql.GraphQLField(user_type), | |
}, | |
) | |
async def resolve_user(root, info, **args): | |
return await info.context["user_repo"].get(args["id"]) | |
async def resolve_create_user(root, info, **args): | |
user = await info.context["user_repo"].create(**args) | |
payload = {"id": user.id, "mutation": MutationEnum.CREATED.value} | |
info.context["event_emitter"].emit(make_pubsub_name(User), payload) | |
info.context["event_emitter"].emit(make_pubsub_name(user), payload) | |
return {"User": user} | |
async def resolve_delete_user(root, info, **args): | |
user = await info.context["user_repo"].get(args["id"]) | |
await info.context["user_repo"].delete(user.id) | |
payload = {"id": user.id, "mutation": MutationEnum.DELETED.value} | |
info.context["event_emitter"].emit(make_pubsub_name(User), payload) | |
info.context["event_emitter"].emit(make_pubsub_name(user), payload) | |
return True | |
async def resolve_update_user(root, info, **args): | |
user = await info.context["user_repo"].update(**args) | |
payload = {"id": user.id, "mutation": MutationEnum.UPDATED.value} | |
info.context["event_emitter"].emit(make_pubsub_name(User), payload) | |
info.context["event_emitter"].emit(make_pubsub_name(user), payload) | |
return {"User": user} | |
def resolve_subscription_user(root, info, **args): | |
# Look below for a proper implementation | |
user = info.context["user_repo"]._get(args["id"]) | |
return {"mutation": MutationEnum(root["mutation"]).value, "node": user} | |
# async def resolve_subscription_user(root, info, **args): | |
# user = await info.context["user_repo"].get(args["id"]) | |
# return {"mutation": MutationEnum(root["mutation"]).value, "node": user} | |
def make_pubsub_name(obj): | |
if inspect.isclass(obj): | |
return obj.__name__ | |
return f"{type(obj).__name__}__{obj.id}" | |
async def subscribe_user(root, info, **args): | |
user = await info.context["user_repo"].get(args["id"]) | |
ee = info.context["event_emitter"] | |
name = make_pubsub_name(user) | |
async for msg in graphql.pyutils.EventEmitterAsyncIterator(ee, name): | |
yield msg | |
schema = graphql.GraphQLSchema( | |
query=graphql.GraphQLObjectType( | |
"RootQueryType", | |
{ | |
"User": graphql.GraphQLField( | |
user_type, args={"id": graphql.GraphQLID}, resolve=resolve_user | |
) | |
}, | |
), | |
mutation=graphql.GraphQLObjectType( | |
"RootMutationType", | |
{ | |
"CreateUser": graphql.GraphQLField( | |
create_user_type, | |
args={ | |
"firstName": graphql.GraphQLString, | |
"lastName": graphql.GraphQLString, | |
}, | |
resolve=resolve_create_user, | |
), | |
"DeleteUser": graphql.GraphQLField( | |
graphql.GraphQLBoolean, | |
args={"id": graphql.GraphQLNonNull(graphql.GraphQLID)}, | |
resolve=resolve_delete_user, | |
), | |
"UpdateUser": graphql.GraphQLField( | |
update_user_type, | |
args={ | |
"id": graphql.GraphQLNonNull(graphql.GraphQLID), | |
"firstName": graphql.GraphQLString, | |
"lastName": graphql.GraphQLString, | |
}, | |
resolve=resolve_update_user, | |
), | |
}, | |
), | |
subscription=graphql.GraphQLObjectType( | |
"RootSubscriptionType", | |
{ | |
"User": graphql.GraphQLField( | |
subscription_user_type, | |
args={"id": graphql.GraphQLID}, | |
subscribe=subscribe_user, | |
resolve=resolve_subscription_user, | |
) | |
}, | |
), | |
) | |
@pytest.fixture | |
def context(): | |
return { | |
"event_emitter": graphql.pyutils.EventEmitter(), | |
"user_repo": UserRepo(), | |
} | |
@pytest.mark.asyncio | |
async def test_query_user(context): | |
user = await context["user_repo"].create(firstName="bill", lastName="gates") | |
query = """ | |
query ($userId: ID!) { | |
User(id: $userId) { | |
id, firstName, lastName | |
} | |
} | |
""" | |
result = await graphql.graphql( | |
schema, | |
query, | |
context_value=context, | |
variable_values={"userId": user.id}, | |
) | |
assert not result.errors | |
assert result.data == { | |
"User": { | |
"id": user.id, | |
"firstName": user.firstName, | |
"lastName": user.lastName, | |
} | |
} | |
@pytest.mark.asyncio | |
async def test_create_user(context): | |
received = {} | |
def update_received(event_name): | |
def _update(msg): | |
received[event_name] = msg | |
return _update | |
context["event_emitter"].add_listener("User", update_received("User")) | |
context["event_emitter"].add_listener("User__0", update_received("User__0")) | |
query = """ | |
mutation ($firstName: String!, $lastName: String!) { | |
CreateUser( | |
firstName: $firstName, | |
lastName: $lastName, | |
) { | |
User { id, firstName, lastName } | |
} | |
}""" | |
variables = {"firstName": "bill", "lastName": "gates"} | |
result = await graphql.graphql( | |
schema, query, context_value=context, variable_values=variables | |
) | |
user = await context["user_repo"].get("0") | |
assert not result.errors | |
assert result.data == { | |
"CreateUser": { | |
"User": { | |
"id": user.id, | |
"firstName": user.firstName, | |
"lastName": user.lastName, | |
} | |
} | |
} | |
assert received == { | |
"User": {"id": "0", "mutation": MutationEnum.CREATED.value}, | |
"User__0": {"id": "0", "mutation": MutationEnum.CREATED.value}, | |
} | |
@pytest.mark.asyncio | |
async def test_delete_user(context): | |
received = {} | |
def update_received(event_name): | |
def _update(msg): | |
received[event_name] = msg | |
return _update | |
context["event_emitter"].add_listener("User", update_received("User")) | |
context["event_emitter"].add_listener("User__0", update_received("User__0")) | |
user = await context["user_repo"].create(firstName="bill", lastName="gates") | |
query = """ | |
mutation ($id: ID!) { | |
DeleteUser( | |
id: $id, | |
) | |
}""" | |
variables = {"id": user.id} | |
result = await graphql.graphql( | |
schema, query, context_value=context, variable_values=variables | |
) | |
assert not await context["user_repo"].get(user.id) | |
assert not result.errors | |
assert result.data == {"DeleteUser": True} | |
assert received == { | |
"User": {"id": "0", "mutation": MutationEnum.DELETED.value}, | |
"User__0": {"id": "0", "mutation": MutationEnum.DELETED.value}, | |
} | |
@pytest.mark.asyncio | |
async def test_update_user(context): | |
received = {} | |
def update_received(event_name): | |
def _update(msg): | |
received[event_name] = msg | |
return _update | |
context["event_emitter"].add_listener("User", update_received("User")) | |
context["event_emitter"].add_listener("User__0", update_received("User__0")) | |
user = await context["user_repo"].create(firstName="bill", lastName="gates") | |
query = """ | |
mutation ($id: ID!, $firstName: String, $lastName: String) { | |
UpdateUser( | |
id: $id, | |
firstName: $firstName, | |
lastName: $lastName, | |
) { | |
User { id, firstName, lastName } | |
} | |
}""" | |
variables = {"id": user.id, "firstName": "willy", "lastName": "g"} | |
result = await graphql.graphql( | |
schema, query, context_value=context, variable_values=variables | |
) | |
updated_user = await context["user_repo"].get("0") | |
assert updated_user.id == user.id | |
assert not result.errors | |
assert result.data == { | |
"UpdateUser": { | |
"User": { | |
"id": updated_user.id, | |
"firstName": updated_user.firstName, | |
"lastName": updated_user.lastName, | |
} | |
} | |
} | |
assert received == { | |
"User": {"id": "0", "mutation": MutationEnum.UPDATED.value}, | |
"User__0": {"id": "0", "mutation": MutationEnum.UPDATED.value}, | |
} | |
@pytest.mark.asyncio | |
async def test_subscribe_user(context): | |
user = await context["user_repo"].create(firstName="bill", lastName="gates") | |
query = """ | |
subscription ($id: ID!) { | |
User(id: $id) { | |
mutation | |
node { id, firstName, lastName } | |
} | |
} | |
""" | |
variables = {"id": user.id} | |
sub = await graphql.subscribe( | |
schema, | |
graphql.parse(query), | |
context_value=context, | |
variable_values=variables, | |
) | |
assert isinstance( | |
sub, graphql.subscription.map_async_iterator.MapAsyncIterator | |
) | |
fut = asyncio.ensure_future(sub.__anext__()) | |
await asyncio.sleep(.1) | |
context["event_emitter"].emit( | |
make_pubsub_name(user), | |
{"id": user.id, "mutation": MutationEnum.UPDATED.value}, | |
) | |
await asyncio.sleep(.01) | |
assert fut.done() | |
result = fut.result() | |
assert not result.errors | |
assert result.data == { | |
"User": { | |
"mutation": MutationEnum.UPDATED.name, | |
"node": {"id": "0", "firstName": "bill", "lastName": "gates"}, | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment