Skip to content

Instantly share code, notes, and snippets.

@pacuna
Last active October 3, 2020 23:20
Show Gist options
  • Save pacuna/e5e901c73b725d9358152bcb457a0b84 to your computer and use it in GitHub Desktop.
Save pacuna/e5e901c73b725d9358152bcb457a0b84 to your computer and use it in GitHub Desktop.
Implementing SQLite UDFs and table-valued functions in Python
from peewee import *
from playhouse.sqlite_ext import TableFunction
import json
# https://github.com/coleifer/peewee/
# Basic connection and sql execution
db = SqliteDatabase('foo.db')
db.connect()
db.execute_sql(
"CREATE TABLE IF NOT EXISTS pageAds (pageid STRING, adid_list JSON);")
# insert data
db.execute_sql(
"INSERT INTO pageAds VALUES ('front_page', JSON_ARRAY(1,2,3)), ('contact_page', JSON_ARRAY(3,4,5));")
# show data
cursor = db.execute_sql("SELECT * FROM pageAds")
for value in cursor:
print(value)
# Define array function to return a JSON_ARRAY object in SQLite
@db.func('array')
def array(*argv):
return json.dumps([x for x in argv])
# example
cursor = db.execute_sql("SELECT ARRAY(1,2,3);")
for value in cursor:
print(value)
# prints: # => ('[1, 2, 3]',)
# Define named_struct function to return a JSON_OBJECT in SQLite
@db.func('named_struct')
def named_struct(*argv):
json_out = {}
i = 0
while i < len(argv) - 1:
if i % 2 == 0:
json_out[argv[i]] = argv[i + 1]
i += 1
return json.dumps(json_out)
# example
cursor = db.execute_sql("SELECT NAMED_STRUCT('a', 1, 'b', 2);")
for value in cursor:
print(value)
# prints: # => (('{"a": 1, "b": 2}',))
# Table-valued function to simulate LATERAL VIEW with explode by joining to it
# This is not supported by the official python sqlite module.
@db.table_function('explode')
class Explode(TableFunction):
columns = ['col']
params = ['array_data']
def initialize(self, array_data):
self.array_data = json.loads(array_data)
self.cur = 0
def iterate(self, idx):
if self.cur >= len(self.array_data):
raise StopIteration
ret, self.cur = self.array_data[self.cur], self.cur + 1
return (ret,)
# example
cursor = db.execute_sql("SELECT t.col FROM EXPLODE(ARRAY(1,2,3)) as t")
for value in cursor:
print(value)
# prints:
# (1,)
# (2,)
# (3,)
# Table-valued function for posexplode
@db.table_function('posexplode')
class Posexplode(TableFunction):
columns = ['pos', 'val']
params = ['array_data']
def initialize(self, array_data):
self.array_data = json.loads(array_data)
self.cur = 0
def iterate(self, idx):
if self.cur >= len(self.array_data):
raise StopIteration
ret, self.cur = self.array_data[self.cur], self.cur + 1
return (ret, self.cur - 1,)
# example
cursor = db.execute_sql(
"SELECT t.pos, t.val FROM POSEXPLODE(ARRAY(1,2,3)) as t")
for value in cursor:
print(value)
# prints:
# (1, 0)
# (2, 1)
# (3, 2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment