Last active
October 9, 2024 12:59
-
-
Save crucialfelix/d14506cdb9d3a04427a72aadcad30af4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from django.db import models | |
from graphene.relay import PageInfo | |
from graphene_django.fields import DjangoConnectionField | |
from graphql_relay.connection.arrayconnection import cursor_to_offset, offset_to_cursor | |
class SimplifiedDjangoConnectionField(DjangoConnectionField): | |
"""A simplified extension of DjangoConnectionField | |
This avoids calling len() or .count() on the QuerySet which has very | |
poor performance on PostgresQL. The value is only used to determine if there | |
is another page. | |
This does NOT support the GraphQL args "before" or "last" | |
Nest Seekers does not use these anyway. If you do need to use them for some purpose, | |
then change just that schema field from SimplifiedDjangoConnectionField back to DjangoConnectionField | |
""" | |
@classmethod | |
def resolve_connection(cls, connection, default_manager, args, iterable): | |
# iterable should be sliceable ie. a QuerySet or list | |
if isinstance(iterable, models.Manager): | |
iterable = iterable.all() | |
assert hasattr(iterable, "__getitem__"), ValueError( | |
"Iterable must support list slice {}".format(type(iterable)) | |
) | |
connection = simplified_connection_from_list_slice( | |
iterable, args or {}, connection_type=connection, edge_type=connection.Edge | |
) | |
connection.iterable = iterable | |
return connection | |
def simplified_connection_from_list_slice( | |
list_slice, args=None, connection_type=None, edge_type=None, max_limit=250 | |
): | |
""" | |
Given a slice (subset) of an array, returns a connection object for use in | |
GraphQL. | |
This function is similar to `connectionFromArray`, but is intended for use | |
cases where you know the cardinality of the connection, consider it too large | |
to materialize the entire array, and instead wish pass in a slice of the | |
total result large enough to cover the range specified in `args`. | |
""" | |
assert "before" not in args, NotImplementedError( | |
"GraphQL query arg 'before' is not implemented in 'simplified_connection_from_list_slice'" | |
) | |
assert "last" not in args, NotImplementedError( | |
"GraphQL query arg 'last' is not implemented in 'simplified_connection_from_list_slice'" | |
) | |
after_cursor = args.get("after") | |
after = cursor_to_offset(after_cursor) if after_cursor else None | |
first = parse_int(args.get("first")) or max_limit | |
start = 0 if after is None else after + 1 | |
end = start + first | |
# slice off one more than we will be returning | |
items = list_slice[start : end + 1] | |
# so we know if there is another page | |
has_next_page = len(items) > first | |
has_previous_page = start > 0 | |
items = items[:first] | |
# print( | |
# "start: {}, end:{} length: {} prev:{} next:{}".format( | |
# start, end, len(items), has_previous_page, has_next_page | |
# ) | |
# ) | |
edges = [ | |
edge_type(node=node, cursor=offset_to_cursor(start + i)) | |
for i, node in enumerate(items) | |
] | |
first_edge_cursor = edges[0].cursor if edges else None | |
last_edge_cursor = edges[-1].cursor if edges else None | |
# print( | |
# "edges: {} first:{} last: {}".format( | |
# len(edges), | |
# cursor_to_offset(first_edge_cursor), | |
# cursor_to_offset(last_edge_cursor), | |
# ) | |
# ) | |
conn = connection_type( | |
edges=edges, | |
page_info=PageInfo( | |
start_cursor=first_edge_cursor, | |
end_cursor=last_edge_cursor, | |
has_previous_page=has_previous_page, | |
has_next_page=has_next_page, | |
), | |
) | |
conn.length = len(items) | |
return conn | |
# from globalapp.utils import parse_int | |
num_re = re.compile(r"[^0-9\.]") | |
def parse_int(input): | |
""" | |
Parses a string as int | |
@returns {int|None} | |
""" | |
if input is None: | |
return None | |
input = unicode(input).strip() | |
if len(input) > 19: | |
return None | |
clean = num_re.sub(u"", input) | |
if input == u'': | |
return None | |
# minus sign would get stripped off so add it back in if it was there | |
if input.startswith("-"): | |
clean = u"-%s" % clean | |
try: | |
return int(float(clean)) | |
except ValueError as error: | |
# log.warning("Failed to parse_int: '%s' ERROR: %s", input, error) | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment