Skip to content

Instantly share code, notes, and snippets.

@gihankarunarathne
Last active February 28, 2023 12:44
Show Gist options
  • Save gihankarunarathne/56719cdebc96a131ac8ff939eff2aebf to your computer and use it in GitHub Desktop.
Save gihankarunarathne/56719cdebc96a131ac8ff939eff2aebf to your computer and use it in GitHub Desktop.
Query Spanner data using SQLAlchemy Dialect in plain format.
# You can pass `execution_options` such as https://github.com/googleapis/python-spanner-sqlalchemy#stale-reads.
# You don't need to set them while creating the Spanner db instance.
def foo():
with engines.get_engine('purchase').execution_options(read_only=True).connect() as conn:
reseller_thresholds = zsql(
conn,
'''
SELECT
p.ref_type,
p.ref_code,
p.sku,
p.id_product_fulltype,
rt.sku_category,
COALESCE(ANY_VALUE(rt.threshold), ANY_VALUE(rr.default_threshold)) AS threshold,
COALESCE(STRING_AGG(CAST(IF(p.id_sales_order_item IN UNNEST(@soi_list), p.id_sales_order_item, NULL) AS STRING)), '') AS id_sales_order_items,
SUM(p.item_status) AS purchased_count,
@period AS period
FROM purchase_history p
LEFT JOIN@{JOIN_TYPE=APPLY_JOIN} sku_category_mapping scm ON scm.sku = p.sku AND scm.country_code = @country_code
LEFT JOIN@{JOIN_TYPE=APPLY_JOIN} reseller_threshold rt
ON p.id_product_fulltype = rt.id_product_fulltype
AND COALESCE(scm.sku_category, 'Others') = rt.sku_category
AND p.ref_type = rt.ref_type
AND p.country_code = rt.country_code
AND p.id_mp = rt.id_mp
LEFT JOIN reseller_ref rr ON p.ref_type = rr.ref_type
WHERE (p.ref_code, p.ref_type, p.country_code, p.id_mp) IN UNNEST(@ref_type_list)
AND p.sku IN UNNEST(@sku_list)
AND DATE_SUB(CURRENT_DATE(), INTERVAL @period DAY) <= DATE(p.created_at)
GROUP BY 1,2,3,4,5
''',
ref_type_list=ref_type_list,
soi_list=soi_list,
sku_list=sku_list,
country_code=country_code,
period=30,
).dicts()
# Similar to defining MySQL database connections on engines.py, define Spanner connections like below
# Ref: https://github.com/fastfishio/backend-python-boilerplate/blob/master/src/libutil/engines.py#L14
def define_engines(app_name):
# ... other engines
engineutil.define_engine('test_spanner', 'spanner+spanner:///projects/noon-staging/instances/core01/databases/fooddb')
# OR
engineutil.define_spanner_engine('test_spanner', 'spanner+spanner:///projects/noon-staging/instances/core01/databases/fooddb')
from noonutil.v1.spannerutil import zsql
def foo():
with engines.get_engine('test_spanner').begin() as conn:
stores = zsql(conn, '''
SELECT store_code
FROM store_following
WHERE customer_code = @customer_code
AND is_active
''', customer_code=ctx.customer_code).scalars()
# Or you can query data using `jsql`. But this is only working for primary variables. This will not work for list, set etc.
from jsql import sql
def foo():
with engines.get_engine('test_spanner').begin() as conn:
stores = sql(conn, '''
SELECT store_code
FROM store_following
WHERE customer_code = @customer_code
AND is_active
''', customer_code=ctx.customer_code).scalars()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment