Skip to content

Instantly share code, notes, and snippets.

@tantaman
Created November 27, 2023 19:29
Show Gist options
  • Save tantaman/a4db1320ecfd4495a7ce5cf083f768ff to your computer and use it in GitHub Desktop.
Save tantaman/a4db1320ecfd4495a7ce5cf083f768ff to your computer and use it in GitHub Desktop.
// Automatically-generated file
#![allow(dead_code)]
#![allow(non_snake_case)]
#![allow(unused_imports)]
#![allow(unused_parens)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(non_camel_case_types)]
#[cfg(test)]
use hashing::*;
use paste::paste;
use derive_more::{Add,Sub,Neg,From,Into,AddAssign};
use dbsp::{
algebra::{ZSet, MulByRef, F32, F64, Semigroup, SemigroupValue, ZRingValue,
UnimplementedSemigroup, DefaultSemigroup, HasZero, AddByRef, NegByRef,
AddAssignByRef,
},
circuit::{Circuit, Stream},
operator::{
Generator,
FilterMap,
Fold,
time_series::{RelRange, RelOffset, OrdPartitionedIndexedZSet},
MaxSemigroup,
MinSemigroup,
},
trace::ord::{OrdIndexedZSet, OrdZSet},
zset,
indexed_zset,
DBWeight,
DBData,
DBSPHandle,
Runtime,
};
use dbsp_adapters::{deserialize_table_record, Catalog};
use size_of::*;
use ::serde::{Deserialize,Serialize};
use compare::{Compare, Extract};
use std::{
convert::identity,
ops::{Add, Neg, AddAssign},
fmt::{Debug, Formatter, Result as FmtResult},
cell::RefCell,
path::Path,
rc::Rc,
marker::PhantomData,
str::FromStr,
};
use rust_decimal::Decimal;
use tuple::declare_tuples;
use sqllib::{
*,
casts::*,
geopoint::*,
timestamp::*,
interval::*,
string::*,
operators::*,
};
#[cfg(test)]
use sqlvalue::*;
#[cfg(test)]
use readers::*;
#[cfg(test)]
use sqlx::{AnyConnection, any::AnyRow, Row};
type Weight = i64;
#[derive(Clone)]
pub struct Semigroup1<T0, TS0>(PhantomData<(T0, TS0)>);
impl<T0, TS0> Semigroup<(T0, )> for Semigroup1<T0, TS0>
where
TS0: Semigroup<T0>
{
fn combine(left: &(T0, ), right:&(T0, )) -> (T0, ) {
(
TS0::combine(&left.0, &right.0),
)
}
}
declare_tuples! {
Tuple1<T0>,
Tuple2<T0, T1>,
Tuple4<T0, T1, T2, T3>,
Tuple5<T0, T1, T2, T3, T4>,
Tuple6<T0, T1, T2, T3, T4, T5>,
Tuple7<T0, T1, T2, T3, T4, T5, T6>,
Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>,
Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>,
}
pub fn circuit(workers: usize) -> (DBSPHandle, Catalog) {
let (circuit, catalog) = Runtime::init_circuit(workers, |circuit| {
let mut catalog = Catalog::new();// rel#1:LogicalTableScan.(table=[schema, TRACK])
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
struct TABLE {
ID: Option<i32>,
NAME: Option<String>,
ALBUM_ID: Option<i32>,
DURATION: Option<i32>,
}
impl From<TABLE> for Tuple4<Option<i32>, Option<String>, Option<i32>, Option<i32>> {
fn from(table: TABLE) -> Self {
Tuple4::new(table.ID,table.NAME,table.ALBUM_ID,table.DURATION,)
}
}
impl From<Tuple4<Option<i32>, Option<String>, Option<i32>, Option<i32>>> for TABLE {
fn from(tuple: Tuple4<Option<i32>, Option<String>, Option<i32>, Option<i32>>) -> Self {
Self {
ID: tuple.0,
NAME: tuple.1,
ALBUM_ID: tuple.2,
DURATION: tuple.3,
}
}
}
deserialize_table_record!(TABLE["TRACK", 4] {
(ID, "ID", false, Option<i32>, Some(None)),
(NAME, "NAME", false, Option<String>, Some(None)),
(ALBUM_ID, "ALBUM_ID", false, Option<i32>, Some(None)),
(DURATION, "DURATION", false, Option<i32>, Some(None))
});
// DBSPSourceOperator 2386
// CREATE TABLE `TRACK` (`ID` INTEGER, `NAME` `TEXT`, `ALBUM_ID` INTEGER, `DURATION` INTEGER)
let (TRACK, handle0) = circuit.add_input_set::<Tuple4<Option<i32>, Option<String>, Option<i32>, Option<i32>>, Weight>();
// rel#3:LogicalTableScan.(table=[schema, ALBUM])
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
struct TABLE_0 {
ID: Option<i32>,
NAME: Option<String>,
}
impl From<TABLE_0> for Tuple2<Option<i32>, Option<String>> {
fn from(table: TABLE_0) -> Self {
Tuple2::new(table.ID,table.NAME,)
}
}
impl From<Tuple2<Option<i32>, Option<String>>> for TABLE_0 {
fn from(tuple: Tuple2<Option<i32>, Option<String>>) -> Self {
Self {
ID: tuple.0,
NAME: tuple.1,
}
}
}
deserialize_table_record!(TABLE_0["ALBUM", 2] {
(ID, "ID", false, Option<i32>, Some(None)),
(NAME, "NAME", false, Option<String>, Some(None))
});
// DBSPSourceOperator 2392
// CREATE TABLE `ALBUM` (`ID` INTEGER, `NAME` `TEXT`)
let (ALBUM, handle1) = circuit.add_input_set::<Tuple2<Option<i32>, Option<String>>, Weight>();
// rel#143:LogicalJoin.(left=LogicalTableScan#1,right=LogicalTableScan#3,condition==($2, $4),joinType=inner)
// DBSPFilterOperator 2409
let stream134: Stream<_, OrdZSet<Tuple4<Option<i32>, Option<String>, Option<i32>, Option<i32>>, Weight>> = TRACK.filter(move |r: &Tuple4<Option<i32>, Option<String>, Option<i32>, Option<i32>>, | ->
bool {
(!r.2.is_none())
});
// rel#143:LogicalJoin.(left=LogicalTableScan#1,right=LogicalTableScan#3,condition==($2, $4),joinType=inner)
// DBSPFilterOperator 2427
let stream136: Stream<_, OrdZSet<Tuple2<Option<i32>, Option<String>>, Weight>> = ALBUM.filter(move |r: &Tuple2<Option<i32>, Option<String>>, | ->
bool {
(!r.0.is_none())
});
// rel#143:LogicalJoin.(left=LogicalTableScan#1,right=LogicalTableScan#3,condition==($2, $4),joinType=inner)
// DBSPIndexOperator 2465
let stream137: Stream<_, OrdIndexedZSet<(i32, ), Tuple4<Option<i32>, Option<String>, Option<i32>, Option<i32>>, Weight>> = stream134.index_with(move |l: &Tuple4<Option<i32>, Option<String>, Option<i32>, Option<i32>>, | ->
((i32, ), Tuple4<Option<i32>, Option<String>, Option<i32>, Option<i32>>, ) {
((cast_to_i32_i32N(l.2), ), Tuple4::new(l.0, l.1.clone(), l.2, l.3), )
});
// rel#143:LogicalJoin.(left=LogicalTableScan#1,right=LogicalTableScan#3,condition==($2, $4),joinType=inner)
// DBSPIndexOperator 2494
let stream138: Stream<_, OrdIndexedZSet<(i32, ), Tuple2<Option<i32>, Option<String>>, Weight>> = stream136.index_with(move |r: &Tuple2<Option<i32>, Option<String>>, | ->
((i32, ), Tuple2<Option<i32>, Option<String>>, ) {
((cast_to_i32_i32N(r.0), ), Tuple2::new(r.0, r.1.clone()), )
});
// rel#143:LogicalJoin.(left=LogicalTableScan#1,right=LogicalTableScan#3,condition==($2, $4),joinType=inner)
// DBSPJoinOperator 2536
let stream139: Stream<_, OrdZSet<Tuple6<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>>, Weight>> = stream137.stream_join(&stream138, move |k: &(i32, ), l: &Tuple4<Option<i32>, Option<String>, Option<i32>, Option<i32>>, r: &Tuple2<Option<i32>, Option<String>>, | ->
Tuple6<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>> {
Tuple6::new(l.0, l.1.clone(), l.2, l.3, r.0, r.1.clone())
});
// rel#7:LogicalTableScan.(table=[schema, PLAYLIST_TRACK])
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
struct TABLE_1 {
PLAYLIST_ID: Option<i32>,
TRACK_ID: Option<i32>,
ADDED_AT: Option<i32>,
NUMBER: Option<i32>,
}
impl From<TABLE_1> for Tuple4<Option<i32>, Option<i32>, Option<i32>, Option<i32>> {
fn from(table: TABLE_1) -> Self {
Tuple4::new(table.PLAYLIST_ID,table.TRACK_ID,table.ADDED_AT,table.NUMBER,)
}
}
impl From<Tuple4<Option<i32>, Option<i32>, Option<i32>, Option<i32>>> for TABLE_1 {
fn from(tuple: Tuple4<Option<i32>, Option<i32>, Option<i32>, Option<i32>>) -> Self {
Self {
PLAYLIST_ID: tuple.0,
TRACK_ID: tuple.1,
ADDED_AT: tuple.2,
NUMBER: tuple.3,
}
}
}
deserialize_table_record!(TABLE_1["PLAYLIST_TRACK", 4] {
(PLAYLIST_ID, "PLAYLIST_ID", false, Option<i32>, Some(None)),
(TRACK_ID, "TRACK_ID", false, Option<i32>, Some(None)),
(ADDED_AT, "ADDED_AT", false, Option<i32>, Some(None)),
(NUMBER, "NUMBER", false, Option<i32>, Some(None))
});
// DBSPSourceOperator 2544
// CREATE TABLE `PLAYLIST_TRACK` (`PLAYLIST_ID` INTEGER, `TRACK_ID` INTEGER, `ADDED_AT` INTEGER, `NUMBER` INTEGER)
let (PLAYLIST_TRACK, handle2) = circuit.add_input_set::<Tuple4<Option<i32>, Option<i32>, Option<i32>, Option<i32>>, Weight>();
// rel#146:LogicalJoin.(left=LogicalJoin#143,right=LogicalTableScan#7,condition==($7, $0),joinType=inner)
// DBSPFilterOperator 2561
let stream141: Stream<_, OrdZSet<Tuple6<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>>, Weight>> = stream139.filter(move |r: &Tuple6<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>>, | ->
bool {
(!r.0.is_none())
});
// rel#146:LogicalJoin.(left=LogicalJoin#143,right=LogicalTableScan#7,condition==($7, $0),joinType=inner)
// DBSPFilterOperator 2579
let stream143: Stream<_, OrdZSet<Tuple4<Option<i32>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = PLAYLIST_TRACK.filter(move |r: &Tuple4<Option<i32>, Option<i32>, Option<i32>, Option<i32>>, | ->
bool {
(!r.1.is_none())
});
// rel#146:LogicalJoin.(left=LogicalJoin#143,right=LogicalTableScan#7,condition==($7, $0),joinType=inner)
// DBSPIndexOperator 2626
let stream144: Stream<_, OrdIndexedZSet<(i32, ), Tuple6<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>>, Weight>> = stream141.index_with(move |l: &Tuple6<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>>, | ->
((i32, ), Tuple6<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>>, ) {
((cast_to_i32_i32N(l.0), ), Tuple6::new(l.0, l.1.clone(), l.2, l.3, l.4, l.5.clone()), )
});
// rel#146:LogicalJoin.(left=LogicalJoin#143,right=LogicalTableScan#7,condition==($7, $0),joinType=inner)
// DBSPIndexOperator 2662
let stream145: Stream<_, OrdIndexedZSet<(i32, ), Tuple4<Option<i32>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream143.index_with(move |r: &Tuple4<Option<i32>, Option<i32>, Option<i32>, Option<i32>>, | ->
((i32, ), Tuple4<Option<i32>, Option<i32>, Option<i32>, Option<i32>>, ) {
((cast_to_i32_i32N(r.1), ), Tuple4::new(r.0, r.1, r.2, r.3), )
});
// rel#146:LogicalJoin.(left=LogicalJoin#143,right=LogicalTableScan#7,condition==($7, $0),joinType=inner)
// DBSPJoinOperator 2720
let stream146: Stream<_, OrdZSet<Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream144.stream_join(&stream145, move |k: &(i32, ), l: &Tuple6<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>>, r: &Tuple4<Option<i32>, Option<i32>, Option<i32>, Option<i32>>, | ->
Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>> {
Tuple10::new(l.0, l.1.clone(), l.2, l.3, l.4, l.5.clone(), r.0, r.1, r.2, r.3)
});
// rel#11:LogicalTableScan.(table=[schema, ARG])
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
struct TABLE_2 {
A: Option<i32>,
}
impl From<TABLE_2> for Tuple1<Option<i32>> {
fn from(table: TABLE_2) -> Self {
Tuple1::new(table.A,)
}
}
impl From<Tuple1<Option<i32>>> for TABLE_2 {
fn from(tuple: Tuple1<Option<i32>>) -> Self {
Self {
A: tuple.0,
}
}
}
deserialize_table_record!(TABLE_2["ARG", 1] {
(A, "A", false, Option<i32>, Some(None))
});
// DBSPSourceOperator 2725
// CREATE TABLE `ARG` (`A` INTEGER)
let (ARG, handle3) = circuit.add_input_set::<Tuple1<Option<i32>>, Weight>();
// rel#151:LogicalAggregate.(input=LogicalTableScan#11,group={},agg#0=SINGLE_VALUE($0))
// DBSPIndexOperator 2744
let stream147: Stream<_, OrdIndexedZSet<(), Tuple1<Option<i32>>, Weight>> = ARG.index_with(move |t: &Tuple1<Option<i32>>, | ->
((), Tuple1<Option<i32>>, ) {
((), Tuple1::new(t.0), )
});
// rel#151:LogicalAggregate.(input=LogicalTableScan#11,group={},agg#0=SINGLE_VALUE($0))
// DBSPAggregateOperator 3168
let stream167: Stream<_, OrdIndexedZSet<(), Tuple1<Option<i32>>, Weight>> = stream147.stream_aggregate(Fold::<_, Semigroup1<Option<i32>, DefaultOptSemigroup<i32>>, _, _>::with_output((None::<i32>, ), move |a: &mut (Option<i32>, ), v: &Tuple1<Option<i32>>, w: Weight, | {
*a = (v.0, )
}, move |a: (Option<i32>, ), | ->
Tuple1<Option<i32>> {
Tuple1::new(a.0)
}));
// rel#151:LogicalAggregate.(input=LogicalTableScan#11,group={},agg#0=SINGLE_VALUE($0))
// DBSPMapOperator 3170
let stream168: Stream<_, OrdZSet<Tuple1<Option<i32>>, Weight>> = stream167.map(move |kv: (&(), &Tuple1<Option<i32>>, ), | ->
Tuple1<Option<i32>> {
Tuple1::new(kv.1.0)
});
// rel#151:LogicalAggregate.(input=LogicalTableScan#11,group={},agg#0=SINGLE_VALUE($0))
// DBSPMapOperator 3175
let stream169: Stream<_, OrdZSet<Tuple1<Option<i32>>, Weight>> = stream168.map(move |_t: &Tuple1<Option<i32>>, | ->
Tuple1<Option<i32>> {
Tuple1::new(None::<i32>)
});
// rel#151:LogicalAggregate.(input=LogicalTableScan#11,group={},agg#0=SINGLE_VALUE($0))
// DBSPNegateOperator 3177
let stream170: Stream<_, OrdZSet<Tuple1<Option<i32>>, Weight>> = stream169.neg();
// rel#151:LogicalAggregate.(input=LogicalTableScan#11,group={},agg#0=SINGLE_VALUE($0))
let stream152 = circuit.add_source(Generator::new(|| zset!(
Tuple1::new(None::<i32>) => 1,
)));
// rel#151:LogicalAggregate.(input=LogicalTableScan#11,group={},agg#0=SINGLE_VALUE($0))
// DBSPSumOperator 3178
let stream171: Stream<_, OrdZSet<Tuple1<Option<i32>>, Weight>> = stream152.sum([&stream170, &stream168]);
// rel#153:LogicalJoin.(left=LogicalJoin#146,right=LogicalAggregate#151,condition=true,joinType=left)
// DBSPIndexOperator 2890
let stream154: Stream<_, OrdIndexedZSet<(), Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream146.index_with(move |l: &Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, | ->
((), Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, ) {
((), Tuple10::new(l.0, l.1.clone(), l.2, l.3, l.4, l.5.clone(), l.6, l.7, l.8, l.9), )
});
// rel#153:LogicalJoin.(left=LogicalJoin#146,right=LogicalAggregate#151,condition=true,joinType=left)
// DBSPIndexOperator 3180
let stream172: Stream<_, OrdIndexedZSet<(), Tuple1<Option<i32>>, Weight>> = stream171.index_with(move |r: &Tuple1<Option<i32>>, | ->
((), Tuple1<Option<i32>>, ) {
((), Tuple1::new(r.0), )
});
// rel#153:LogicalJoin.(left=LogicalJoin#146,right=LogicalAggregate#151,condition=true,joinType=left)
// DBSPJoinOperator 3182
let stream173: Stream<_, OrdZSet<Tuple11<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream154.stream_join(&stream172, move |k: &(), l: &Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, r: &Tuple1<Option<i32>>, | ->
Tuple11<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>, Option<i32>> {
Tuple11::new(l.0, l.1.clone(), l.2, l.3, l.4, l.5.clone(), l.6, l.7, l.8, l.9, r.0)
});
// rel#153:LogicalJoin.(left=LogicalJoin#146,right=LogicalAggregate#151,condition=true,joinType=left)
// DBSPMapOperator 3184
let stream174: Stream<_, OrdZSet<Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream173.map(move |j: &Tuple11<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, | ->
Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>> {
Tuple10::new(j.0, j.1.clone(), j.2, j.3, j.4, j.5.clone(), j.6, j.7, j.8, j.9)
});
// rel#153:LogicalJoin.(left=LogicalJoin#146,right=LogicalAggregate#151,condition=true,joinType=left)
// DBSPDistinctOperator 3186
let stream175: Stream<_, OrdZSet<Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream174.stream_distinct();
// rel#153:LogicalJoin.(left=LogicalJoin#146,right=LogicalAggregate#151,condition=true,joinType=left)
// DBSPSubtractOperator 3187
let stream176: Stream<_, OrdZSet<Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream146.minus(&stream175);
// rel#153:LogicalJoin.(left=LogicalJoin#146,right=LogicalAggregate#151,condition=true,joinType=left)
// DBSPDistinctOperator 3188
let stream177: Stream<_, OrdZSet<Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream176.stream_distinct();
// rel#155:LogicalProject.(input=LogicalJoin#153,inputs=0..1,exprs=[$3, $5, $8, $9, $10])
// DBSPMapOperator 3190
let stream178: Stream<_, OrdZSet<Tuple7<Option<i32>, Option<String>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream173.map(move |t: &Tuple11<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, | ->
Tuple7<Option<i32>, Option<String>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>> {
Tuple7::new(t.0, t.1.clone(), t.3, t.5.clone(), t.8, t.9, t.10)
});
// rel#153:LogicalJoin.(left=LogicalJoin#146,right=LogicalAggregate#151,condition=true,joinType=left)
// DBSPMapOperator 3193
let stream179: Stream<_, OrdZSet<Tuple7<Option<i32>, Option<String>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream177.map(move |l: &Tuple10<Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>, Option<i32>>, | ->
Tuple7<Option<i32>, Option<String>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>> {
Tuple7::new(l.0, l.1.clone().clone(), l.3, l.5.clone().clone(), l.8, l.9, None::<i32>)
});
// rel#153:LogicalJoin.(left=LogicalJoin#146,right=LogicalAggregate#151,condition=true,joinType=left)
// DBSPSumOperator 3195
let stream180: Stream<_, OrdZSet<Tuple7<Option<i32>, Option<String>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream178.sum([&stream179]);
// rel#157:LogicalFilter.(input=LogicalProject#155,condition==($0, $6))
// DBSPFilterOperator 3198
let stream182: Stream<_, OrdZSet<Tuple7<Option<i32>, Option<String>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>>, Weight>> = stream180.filter(move |t: &Tuple7<Option<i32>, Option<String>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>>, | ->
bool {
wrap_bool(eq_i32N_i32N(t.0, t.6))
});
// rel#159:LogicalProject.(input=LogicalFilter#157,exprs=[$4, $5, $1, $2, $3])
// DBSPMapOperator 3201
let stream183: Stream<_, OrdZSet<Tuple5<Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<String>>, Weight>> = stream182.map(move |t: &Tuple7<Option<i32>, Option<String>, Option<i32>, Option<String>, Option<i32>, Option<i32>, Option<i32>>, | ->
Tuple5<Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<String>> {
Tuple5::new(t.4, t.5, t.1.clone(), t.2, t.3.clone())
});
// CREATE VIEW `PLAYLIST_VIEW` AS
// SELECT `PLAYLIST_TRACK`.`ADDED_AT`, `PLAYLIST_TRACK`.`NUMBER`, `TRACK`.`NAME`, `TRACK`.`DURATION`, `ALBUM`.`NAME` AS `ALBUM_NAME`
// FROM `schema`.`TRACK` AS `TRACK`
// INNER JOIN `schema`.`ALBUM` AS `ALBUM` ON `TRACK`.`ALBUM_ID` = `ALBUM`.`ID`
// INNER JOIN `schema`.`PLAYLIST_TRACK` AS `PLAYLIST_TRACK` ON `PLAYLIST_TRACK`.`TRACK_ID` = `TRACK`.`ID`
// WHERE `TRACK`.`ID` = (((SELECT `ARG`.`A`
// FROM `schema`.`ARG` AS `ARG`)))
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
struct TABLE_3 {
ADDED_AT: Option<i32>,
NUMBER: Option<i32>,
NAME: Option<String>,
DURATION: Option<i32>,
ALBUM_NAME: Option<String>,
}
impl From<TABLE_3> for Tuple5<Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<String>> {
fn from(table: TABLE_3) -> Self {
Tuple5::new(table.ADDED_AT,table.NUMBER,table.NAME,table.DURATION,table.ALBUM_NAME,)
}
}
impl From<Tuple5<Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<String>>> for TABLE_3 {
fn from(tuple: Tuple5<Option<i32>, Option<i32>, Option<String>, Option<i32>, Option<String>>) -> Self {
Self {
ADDED_AT: tuple.0,
NUMBER: tuple.1,
NAME: tuple.2,
DURATION: tuple.3,
ALBUM_NAME: tuple.4,
}
}
}
deserialize_table_record!(TABLE_3["PLAYLIST_VIEW", 5] {
(ADDED_AT, "ADDED_AT", false, Option<i32>, Some(None)),
(NUMBER, "NUMBER", false, Option<i32>, Some(None)),
(NAME, "NAME", false, Option<String>, Some(None)),
(DURATION, "DURATION", false, Option<i32>, Some(None)),
(ALBUM_NAME, "ALBUM_NAME", false, Option<String>, Some(None))
});
catalog.register_input_set::<_, TABLE>("TRACK", TRACK.clone(), handle0);
catalog.register_input_set::<_, TABLE_0>("ALBUM", ALBUM.clone(), handle1);
catalog.register_input_set::<_, TABLE_1>("PLAYLIST_TRACK", PLAYLIST_TRACK.clone(), handle2);
catalog.register_input_set::<_, TABLE_2>("ARG", ARG.clone(), handle3);
catalog.register_output_zset::<_, TABLE_3>("PLAYLIST_VIEW", stream183);
Ok(catalog)
}).unwrap();
(circuit, catalog)
}
./sql-to-dbsp music.sql
CREATE TABLE track (
id INTEGER,
name TEXT,
album_id INTEGER,
duration INTEGER
-- PRIMARY KEY (id)
);
CREATE TABLE playlist_track (
playlist_id INTEGER,
track_id INTEGER,
added_at INTEGER,
number INTEGER
-- PRIMARY KEY (playlist_id, track_id)
);
CREATE TABLE artist (
id INTEGER,
name TEXT
-- PRIMARY KEY (id)
);
CREATE TABLE track_artist (
track_id INTEGER,
artist_id INTEGER
-- PRIMARY KEY (track_id, artist_id)
);
CREATE TABLE album (
id INTEGER,
name TEXT
-- PRIMARY KEY (id)
);
CREATE TABLE arg (
a INTEGER
);
CREATE VIEW playlist_view AS
SELECT
playlist_track.added_at,
playlist_track.number,
track.name,
track.duration,
album.name as album_name
-- CONCAT(artist.name) AS artists
FROM track
JOIN album ON track.album_id = album.id
-- JOIN track_artist ON track_artist.track_id = track.id
-- JOIN artist ON artist.id = track_artist.artist_id
JOIN playlist_track ON playlist_track.track_id = track.id
WHERE track.id = (SELECT a FROM arg);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment