Skip to content

Instantly share code, notes, and snippets.

@oubiwann
Last active October 23, 2015 21:05
Show Gist options
  • Select an option

  • Save oubiwann/14050e4e3bba8fba1d56 to your computer and use it in GitHub Desktop.

Select an option

Save oubiwann/14050e4e3bba8fba1d56 to your computer and use it in GitHub Desktop.
Pulsar gen-event Demo
(ns event-demo.clj
(:require [clojure.tools.logging :as log]
[clojure.core.match :refer [match]]
[co.paralleluniverse.pulsar.core :as pulsar]
[co.paralleluniverse.pulsar.actors :as actors])
(:refer-clojure :exclude [promise await bean])
(:import [co.paralleluniverse.common.util Debug]
[co.paralleluniverse.actors LocalActor]))
;;
;; Demonstrate a simple event handler
;;
(defn handler-echo [event-data] (log/debug (str "Event data: " event-data)))
(def event-server (actors/spawn (actors/gen-event)))
(actors/add-handler! event-server handler-echo)
(actors/notify! event-server "some event data")
(actors/notify! event-server "some more event data")
(actors/remove-handler! event-server handler-echo)
(actors/shutdown! event-server)
(ns event-demo.clj
(:require [clojure.tools.logging :as log]
[clojure.core.match :refer [match]]
[co.paralleluniverse.pulsar.core :as pulsar]
[co.paralleluniverse.pulsar.actors :as actors])
(:refer-clojure :exclude [promise await bean])
(:import [co.paralleluniverse.common.util Debug]
[co.paralleluniverse.actors LocalActor]))
;;
;; Demonstrate controled flow of execution with a dispatching event handler
;; that will execute in the proper order, even with async operations
;;
(defn init-job-track [service result]
(log/debug "Starting job tracking ...")
;; execute code to initialize job tracking
;; then run the job
(actors/notify! service {:type :job-run
:result nil
:service service}))
(defn run-job [service result]
;; execute code that runs the job
(log/debug "Running the job ...")
;; ...
(log/debug "Finished job.")
;; now send the result:
(actors/notify! service {:type :job-save-data
:result "<job data>"
:service service}))
(defn save-job-data [service result]
;; update results data store
(log/debug "Saved job data.")
;; the update returns an id that you can use to refer to this job -- pass
;; that on
(log/debug "Got <insert id>.")
(actors/notify! service {:type :job-track-finish
:result "<insert id>"
:service service}))
(defn finish-job-track [service result]
;; update tracking data with information on completed job
(log/debug (str "Updated job traking data with " result))
(actors/notify! service {:type :done
:result result
:service service}))
(defn done [service result]
;; Perform any needed cleanup
(log/debug "Finished job tracking."))
(defn dispatch-handler [{type :type result :result service :service}]
(match [type]
[:job-track-init] (init-job-track service result)
[:job-run] (run-job service result)
[:job-save-data] (save-job-data service result)
[:job-track-finish] (finish-job-track service result)
[:done] (done service result)))
(def event-server (actors/spawn (actors/gen-event)))
(actors/add-handler! event-server dispatch-handler)
(actors/notify! event-server {:type :job-track-init
:result nil
:service event-server})
(actors/remove-handler! event-server dispatch-handler)
(actors/shutdown! event-server)
(ns event-demo.clj
(:require [clojure.tools.logging :as log]
[clojure.core.match :refer [match]]
[co.paralleluniverse.pulsar.core :as pulsar]
[co.paralleluniverse.pulsar.actors :as actors]
[clj-commons-exec :as exec])
(:refer-clojure :exclude [promise await bean])
(:import [co.paralleluniverse.common.util Debug]
[co.paralleluniverse.actors LocalActor]
[co.paralleluniverse.strands Strand]))
;;
;; Demonstrate controled flow of execution with a dispatching event handler
;; that will execute in the proper order, even with async operations
;;
(defn long-running-func []
@(exec/sh ["sleep" "5"])
(exec/sh ["ls" "-l"]))
(pulsar/defsfn init-job-track [service result]
(log/debug "Starting job tracking ...")
;; execute code to initialize job tracking
;; then run the job
(actors/notify! service {:type :job-run
:result nil
:service service}))
(pulsar/defsfn run-job [service result]
;; execute code that runs the job
(log/debug "Running the job ...")
;; ...
(let [job-data @(long-running-func)]
;; job-data (pulsar/join long-runner)]
(log/debug "Finished job.")
;; now send the result:
(actors/notify! service {:type :job-save-data
:result job-data
:service service})))
(pulsar/defsfn save-job-data [service result]
;; update results data store
(log/debug (str "Saved job data <" result ">."))
;; the update returns an id that you can use to refer to this job -- pass
;; that on
(log/debug "Got <insert id>.")
(actors/notify! service {:type :job-track-finish
:result "<insert id>"
:service service}))
(pulsar/defsfn finish-job-track [service result]
;; update tracking data with information on completed job
(log/debug (str "Updated job traking data with " result))
(actors/notify! service {:type :done
:result result
:service service}))
(pulsar/defsfn done [service result]
;; Perform any needed cleanup
(log/debug "Finished job tracking."))
(pulsar/defsfn receive-handler [{type :type result :result service :service}]
(match [type]
[:job-track-init] (init-job-track service result)
[:job-run] (run-job service result)
[:job-save-data] (save-job-data service result)
[:job-track-finish] (finish-job-track service result)
[:done] (done service result)))
(def event-server (actors/spawn (actors/gen-event)))
(actors/add-handler! event-server receive-handler)
(actors/notify! event-server {:type :job-track-init :service event-server})
(actors/remove-handler! event-server dispatch-handler)
(actors/shutdown! event-server)
(defproject event-demo "0.1.0-dev"
:description "Pulsar gen-event Demo"
:url "https://TBD"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.7.0"]
[org.clojure/tools.logging "0.3.1"]
[org.clojure/core.match "0.3.0-alpha4"]
[co.paralleluniverse/pulsar "0.7.3"]
[ch.qos.logback/logback-classic "1.1.3"]
[org.clojars.hozumi/clj-commons-exec "1.2.0"]]
:java-agents [[co.paralleluniverse/quasar-core "0.7.3"]]
:source-paths ["."]
:jvm-opts ["-Dco.paralleluniverse.fibers.detectRunawayFibers=false"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment