Skip to content

Instantly share code, notes, and snippets.

View MichaelDrogalis's full-sized avatar

Michael Drogalis MichaelDrogalis

  • Confluent
  • Seattle, WA
View GitHub Profile
@MichaelDrogalis
MichaelDrogalis / gist:bc620a7617396704125b
Last active May 22, 2018 14:26
The Anatomy of an Onyx Program

The Anatomy of an Onyx Program

In this tutorial, we'll take an in-depth view of what's happening when you execute a simple Onyx program. All of the code can be found in the Onyx Starter repository if you'd like to follow along. The code uses the development environment with HornetQ and ZooKeeper running in memory, so you don't need additional dependencies to run the example for yourself on your machine.

The Workflow

At the core of the program is the workflow - the flow of data that we ingest, apply transformations to, and send to an output for storage. In this program, we're going to ingest some sentences from an input source, split the sentence into individual words, play with capitalization, and add a suffix. Finally, we'll send the transformed data to an output source.

Let's examine the workflow pictorially:

(ns ^:no-doc onyx.logging-configuration
(:require [com.stuartsierra.component :as component]
[taoensso.timbre :refer [info] :as timbre]))
(defrecord LoggingConfiguration [file config]
component/Lifecycle
(start [component]
(if config
(timbre/set-config! [] config)
[{:onyx/name :partition-datoms
:onyx/ident :datomic/partition-datoms
:onyx/type :input
:onyx/medium :datomic
:onyx/consumption :sequential
:onyx/bootstrap? true
:datomic/uri db-uri
:datomic/t t
:datomic/datoms-per-segment batch-size
:datomic/partition :com.mdrogalis/people
user=> (def a 4984967348648376483967984674386738947648397848673468943796984937)
#'user/a
user=> (type a)
clojure.lang.BigInt
user=> (def b 9347683768743769834764843787348967496747698)
#'user/b
user=> (type b)
clojure.lang.BigInt
user=> (- a b)
4984967348648376483958636990617995177813633004886119976300237239N
@MichaelDrogalis
MichaelDrogalis / node-1-hornetq-beans.xml
Last active August 29, 2015 14:03
node-1-hornetq-configuration.xml
<?xml version="1.0" encoding="UTF-8"?>
<deployment xmlns="urn:jboss:bean-deployer:2.0">
<!-- MBean server -->
<bean name="MBeanServer" class="javax.management.MBeanServer">
<constructor factoryClass="java.lang.management.ManagementFactory"
factoryMethod="getPlatformMBeanServer"/>
</bean>
(defn run-query [q]
(info "Executing query: " q)
(try
(if-not (nil? q)
(let [result (database/query *db* q)]
(metrics/record-db-hit)
{:error :nil-query})
(catch Exception e
(warn "Error issuing query" q)
{:error :database-error})))
(def conn (start-datomic! (str "datomic:mem://" (java.util.UUID/randomUUID)) (schema)))
(def data
[{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Mike"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Dorrene"}
{:db/id (d/tempid :com.mdrogalis/people)
:user/name "Benti"}
{:db/id (d/tempid :com.mdrogalis/people)
(when-let [x <value>]
(when (not= x ::poison-pill)
<body using x>))
(defspec integers-closed-over-addition
+ ;; input fn
[^int a ^int b] ;; input spec
(assert (integer? %))) ;; 0 or more validator forms
(+ 2 3) ;; => 5
(+ 5 -7) ;; => -2