Skip to content

Instantly share code, notes, and snippets.

@vladmiller
Created August 8, 2025 15:34
Show Gist options
  • Select an option

  • Save vladmiller/0be83755e65cf5bd942ffba22be3ad35 to your computer and use it in GitHub Desktop.

Select an option

Save vladmiller/0be83755e65cf5bd942ffba22be3ad35 to your computer and use it in GitHub Desktop.
how I deal with dexie
// Factory to generate payment streams
const getPayments$ = () => {
const isLoading$ = new BehaviorSubject<boolean>(false)
const error$ = new Subject<ApiError | null>()
// Stream that holds currently actie DB
const db$ = activeDB$.pipe(map(mapDB<AgentDB>))
const baseCollection$ = db$.pipe(
switchMap((db) => liveQuery(() => db?.payments.toArray() ?? [])),
distinctUntilChanged(isEqual),
map((collection) => collection.map((i) => i.data)),
)
const filteredCollection$ = baseCollection$ // NOTE: Need to implement filtering stream
const search$ = new BehaviorSubject<string>("")
const query$ = new BehaviorSubject<object>({})
const partialQuery$ = new BehaviorSubject<object>({})
const load$ = new Subject<void>()
// Partial query effect
// Each time partialQuery$ is updated, merge the partial query with the existing query and update the query$
partialQuery$
.pipe(
withLatestFrom(query$),
map(([partialQuery, currentQuery]) => ({ ...currentQuery, ...partialQuery })),
)
.subscribe((query) => {
query$.next(query)
})
// Search stream factory
const searchStreamFactory = createSearchStreamFactory(baseCollection$, (item) => [
item.invoice_number,
item.payment_status,
item.type,
])
// Generate OC request body from current database state
const generateOCBody = async (db: AgentDB | undefined): Promise<string> => {
if (!db) return " "
const payments = await db.payments.toArray()
const body = payments.map((payment) => `${payment.id}:${payment.hash || ""}`).join("\n")
return body || " " // Return at least one empty byte if no data
}
// Load effect
combineLatest([load$, db$])
.pipe(
tap(() => {
isLoading$.next(true)
error$.next(null)
}),
switchMap(async ([, db]) => {
const ocBody = await generateOCBody(db)
return domainFetch("agent/v1/payments/oc", {
method: "POST",
body: ocBody,
headers: {
"Content-Type": "text/plain",
},
}).pipe(
handleApiError(),
handleOCStream<AgentPaymentV1>(async (update, remove) => {
await db?.payments.bulkPut(update)
await db?.payments.bulkDelete(remove)
}),
)
}),
switchMap((obs) => obs),
catchError((err) => {
isLoading$.next(false)
error$.next(err)
return []
}),
tap(() => {
isLoading$.next(false)
}),
)
.subscribe()
const getObject$ = (id: string): Observable<AgentPaymentV1 | undefined> => {
return db$.pipe(
switchMap((db) => liveQuery(() => db?.payments.get(id))),
map((payment) => payment?.data),
distinctUntilChanged(isEqual),
)
}
// KPI Block
const paymentsStats$ = baseCollection$.pipe(
map((payments) => {
const total = payments.length
const totalAmount = payments.reduce((sum, payment) => sum + payment.total_amount, 0)
return {
totalPayments: { value: total },
totalAmount: { value: totalAmount },
}
}),
shareReplay(1),
)
return {
isLoading$,
error$,
baseCollection$,
collection$: filteredCollection$,
load$,
getObject$,
search$,
searchFactoryFn: searchStreamFactory,
query$,
partialQuery$,
kpi: {
totalPayments: {
isLoading$: of(false),
error$: of(null),
data$: paymentsStats$.pipe(map((stats) => stats.totalPayments)),
},
totalAmount: {
isLoading$: of(false),
error$: of(null),
data$: paymentsStats$.pipe(map((stats) => stats.totalAmount)),
},
},
fn: {},
} satisfies SliceStream<AgentPaymentV1>
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment