Skip to content

Instantly share code, notes, and snippets.

val input = (Stream.continually(Promise[Unit]) zip Range.inclusive(1, 5)).toList
implicit val materializer = ActorMaterializer()
val source = AckedSource(input)
val Seq(s1, s2) = (1 to 2) map { n => AckedSink.fold[Int, Int](0)(_ + _) }
val g = AckedFlowGraph.closed(s1, s2)((m1, m2) => (m1, m2)) { implicit b =>
(s1, s2) =>
// RabbitSource in op-rabbit v1.0.0-M9 returns an AckedSource
val source = RabbitSource(
"Worker",
rabbitMq,
channel(qos = 100),
consume(topic("app.domain.class", topics = List(Pg.Table("MainTable").all, Pg.Table("RelatedTable").all))),
body(as[PgChange]).map(getPrimaryId))
source.
collect { case Some(i) => i }.
@timcharper
timcharper / ack-via-helper.scala
Last active August 29, 2015 14:25
Akka-Stream-Acknowledgements
val source = Source(RabbitSource(
"Worker",
rabbitMq,
channel(qos = 100),
consume(topic("app.domain.class", topics = List(Pg.Table("MainTable").all, Pg.Table("RelatedTable").all))),
body(as[PgChange]).map(getPrimaryId)))
source.
map { case (p, primaryId) => Delivery(p, primaryId) }.
mapConcat { delivery =>
@timcharper
timcharper / MergeUnordered.scala
Last active August 29, 2015 14:13
Unordered Merge
package m
import akka.actor.{ ActorSystem, Props, ActorRef }
import akka.stream.FlowMaterializer
import akka.stream.actor.{RequestStrategy, ActorPublisherMessage, ActorSubscriberMessage, ActorSubscriber, MaxInFlightRequestStrategy, ActorPublisher }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.stage._
import scala.annotation.tailrec
import scala.collection.mutable.Queue
import scala.util.{Failure, Success}
@timcharper
timcharper / company.el
Last active August 29, 2015 14:12
Some initializers
(require 'company)
(global-company-mode t)
(global-set-key (kbd "s-;") 'company-dabbrev)
(global-set-key (kbd "s-:") 'company-complete)
(global-auto-complete-mode 0) ;; disable this!!! they fight eachother and it is annoying.
@timcharper
timcharper / evil-init.el
Last active October 24, 2015 12:34
Better evil word behavior
(defun forward-evil-word (&optional count)
""
(let ((init-point (point)))
(forward-symbol (or count 1))
(if (= (point) init-point)
count 0)))
(setq evil-symbol-word-search t)
package main
import akka.actor.{Actor,ActorSystem, Props}
import akka.stream.scaladsl.Source
import akka.stream.FlowMaterializer
import scala.concurrent.Future
import scala.util.{Try,Success,Failure}
// import scala.Predef.identity
object Counter {
package main
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.{Source,ForeachSink,FlowGraph,FlowGraphImplicits}
import akka.actor.ActorSystem
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.Try
object Main extends App {
import shapeless._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
object FutureReducer extends Poly2 {
implicit def f[A, B <: HList] = at[Future[A], Future[B]] { (f, resultFuture) =>
resultFuture flatMap { r =>
f map { v =>
v :: r
}
@timcharper
timcharper / bayes.scala
Last active August 29, 2015 14:03
Functional Scala Implementation of Bayes Graph Solver (https://www.youtube.com/watch?v=pPTLK5hFGnQ)
#!/bin/sh
exec scala "$0" "$@"
!#
object Factor extends Enumeration {
val Econ = Value("Econ")
val Stock = Value("Stock")
}
import Factor._