Skip to content

Instantly share code, notes, and snippets.

(defn map-reduce [map-fn reduce-fn n xs]
(let [bspec {:type :mem :merge reduce-fn}
bs (repeatedly n #(bucket/new bspec))
work (fn [b x]
(doseq [[k v] (map-fn x)]
(bucket/merge b k v)))
workers (map #(partial work %) bs)]
;; workers process xs in par, blocking
(do-work workers xs)
;; merge all bucket users
;; bigrams bucket: keys are words,
;; values are map of word counts
(def bigrams
(bucket/new
{:type :mem
;; merge value maps with +
:merge (partial merge-with +)}))
;; For each word, count following words
(doseq [[word next-word]
;; bigrams key is a word and value
;; is a map of word counts
;; merge operation is merging map values with +
(def bigrams
(bucket/new
{:type :mem
:merge (partial merge-with +)}))
;; For each word, count following words
(doseq [[before after]
=ERROR REPORT==== 29-Apr-2011::04:46:03 ===
[{application,mochiweb},"Accept failed error","{error,emfile}"]
=CRASH REPORT==== 29-Apr-2011::04:46:03 ===
crasher:
initial call: mochiweb_acceptor:init/3
pid: <0.3715.4>
registered_name: []
exception exit: {error,accept_failed}
in function mochiweb_acceptor:init/3
in call from proc_lib:init_p_do_apply/3
=CRASH REPORT==== 29-Apr-2011::02:52:28 ===
crasher:
initial call: riak_core_vnode:init/1
pid: <0.1424.0>
registered_name: []
exception exit: {{badmatch,{error,emfile}},[{bitcask,scan_key_files,3},{bitcask,init_keydir,2},{bitcask,open,2},{riak_kv_bitcask_backend,
start,2},{riak_kv_vnode,init,1},{riak_core_vnode,init,1},{gen_fsm,init_it,6},{proc_lib,init_p_do_apply,3}]}
in function gen_fsm:init_it/6
in call from proc_lib:init_p_do_apply/3
ancestors: [riak_core_vnode_sup,riak_core_sup,<0.101.0>]
@aria42
aria42 / bad.erl
Created April 18, 2011 23:35
riak console fail
[{application,riak_core},{exited,{bad_return,{{riak_core_app,start,[normal,[]]},{EXIT,{badarg,[{erlang,binary_to_term,[<<>>]},{riak_core_ring_manager,read_ringfile,1},{riak_core_app,start,2},{application_master,start_it_old,4}]}}}}},{type,permanent}]/usr/lib/riak/lib/os_mon-2.2.5/priv/bin/memsup: Erlang has closed.
Erlang has closed
{"Kernel pid terminated",application_controller,"{application_start_failure,riak_core,{bad_return,{{riak_core_app,start,[normal,[]]},{'EXIT',{badarg,[{erlang,binary_to_term,[<<>>]},{riak_core_ring_manager,read_ringfile,1},{riak_core_app,start,2},{application_master,start_it_old,4}]}}}}}"}
heart: Mon Apr 18 23:31:09 2011: Erlang has closed.
heart: Mon Apr 18 23:31:09 2011: Would reboot. Terminating.
Kernel pid terminated (application_controller) ({application_start_failure,riak_core,{bad_return,{{riak_core_app,start,[normal,[]]},{'EXIT',{b
root (-> (dp/poller-graph s)
(add-edge (fn [x]
(log/info (format "Finished an entry %s has body: %s"
(-> x :entry :link) (not (nil? (-> x :entry :body)))))))
zip/root graph-zip zip/down zip-down ; Move to feed-fetch-node
(add-edge (fn [{k :key b :body}]
(let [es (set (feeds/extract-entries b))]
(swap! fetched-feeds conj k)
(swap! extracted-entries set/union es)
(log/info (format "Added %d entries from %s." (count es) k))))))
(ns work.graph
(:require
[clojure.contrib.logging :as log]
[clojure.zip :as zip]
[work.core :as work]
[clojure.contrib.zip-filter :as zf]
[work.queue :as workq]))
(defn table
"takes kv pairs.
(ns work.graph
(:require
[clojure.contrib.logging :as log]
[clojure.zip :as zip]
[work.core :as work]
[work.queue :as workq]))
(defn table
"takes kv pairs.
keys can be vectors of keys, matching a fn.
(ns work.aggregators
(:use [plumbing.core]
[work.core :only [work available-processors map-work queue-work]]
[work.queue :only [local-queue]]))
(defn- channel-as-lazy-seq
"ch is a fn you call to get an item (no built-in timeout, use with-timeout).
When the ch is exhausted it returns :eof.
This fn returns a lazy sequence from the channel."