Skip to content

Instantly share code, notes, and snippets.

@camsaul
Created December 3, 2020 22:10
Show Gist options
  • Save camsaul/f09b68d9982268ea9e478d47e4ba6b80 to your computer and use it in GitHub Desktop.
Save camsaul/f09b68d9982268ea9e478d47e4ba6b80 to your computer and use it in GitHub Desktop.
MultiQP
(ns metabase.query-processor.multiple
(:require [clojure.core.async :as a]
[metabase.query-processor :as qp]
[metabase.query-processor.context :as qp.context]
[metabase.query-processor.context.default :as context.default]))
(defn- process-query-append-results
"Reduce the results of a single `query` using `rf` and initial value `init`."
[query rf init context]
(if (a/poll! (qp.context/canceled-chan context))
(ensure-reduced init)
(qp/process-query
query
{:canceled-chan (qp.context/canceled-chan context)
:rff (fn [_]
(fn
([] init)
([acc] acc)
([acc row] (rf acc row))))})))
(defn- process-queries-append-results
"Reduce the results of a sequence of `queries` using `rf` and initial value `init`."
[queries rf init context]
(reduce
(fn [acc query]
(process-query-append-results query rf acc context))
init
queries))
(defn- append-queries-context
"Update Query Processor `context` so it appends the rows fetched when running `more-queries`."
[context more-queries]
(cond-> context
(seq more-queries)
(update :rff (fn [rff]
(fn [metadata]
(let [rf (rff metadata)]
(fn
([] (rf))
([acc] (rf (process-queries-append-results more-queries rf acc context)))
([acc row] (rf acc row)))))))))
(defn process-multiple-queries
([queries]
(process-multiple-queries queries (context.default/default-context)))
([[first-query & more-queries] context]
(qp/process-query first-query (append-queries-context context more-queries))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment