Skip to content

Instantly share code, notes, and snippets.

View adamw's full-sized avatar

Adam Warski adamw

View GitHub Profile
class BroadcastActor(connector: QueueConnector[Future])
extends Actor with ActorLogging {
private var consumers: Set[ActorRef] = Set()
override def receive: Receive = {
case Subscribe(actor) => consumers += actor
case Received(msg) =>
consumers.foreach(_ ! msg)
}
class BroadcastActor(connector: QueueConnector[Future])
extends Actor with ActorLogging {
private var consumers: Set[ActorRef] = Set()
override def receive: Receive = {
case Subscribe(actor) => consumers += actor
case Received(msg) =>
consumers.foreach(_ ! msg)
}
case class Subscribe(actor: ActorRef)
case class Received(msg: String)
trait Queue[F[_]] {
def read(): F[String]
def close(): F[Unit]
}
trait QueueConnector[F[_]] {
def connect: F[Queue[F]]
}
crawlerQueue.take.flatMap { msg =>
handleMessage(msg, data).flatMap { data2 =>
if (data2.inProgress.isEmpty) {
data2.workers.values.map(_.fiber.cancel).toList.sequence_
.map(_ => data2.referenceCount)
} else {
crawler(crawlerQueue, data2)
}
}
}
def workerFor(data: CrawlerData, url: Host): Task[(CrawlerData, MQueue[Url])] = {
data.workers.get(url) match {
case None =>
val workerQueue = MQueue.make[Url]
worker(workerQueue, crawlerQueue).map { workerFiber =>
val workerData = WorkerData(workerQueue, workerFiber)
val data2 = data.copy(workers = data.workers + (url -> workerData))
(data2, workerQueue)
}
case Some(wd) => Task.now((data, wd.queue))
case class WorkerData(queue: MQueue[Url],
fiber: Fiber[Unit])
case class CrawlerData(referenceCount: Map[Host, Int],
visitedLinks: Set[Url],
inProgress: Set[Url],
workers: Map[Host, WorkerData])
class MQueue[T](q: AsyncQueue[T]) {
def take: Task[T] = {
Task.deferFuture(q.poll())
}
def offer(t: T): Task[Unit] = {
Task.eval(q.offer(t))
}
}
object MQueue {
def make[T]: MQueue[T] = new MQueue(AsyncQueue.empty)
val crawl = for {
crawlerQueue <- IOQueue.make[Nothing, CrawlerMessage](32)
_ <- crawlerQueue.offer[Nothing](Start(crawlUrl))
r <- crawler(crawlerQueue, CrawlerData(Map(), Set(), Set(), Map()))
} yield r
IO.supervise(crawl, new RuntimeException)
def worker(workerQueue: IOQueue[Url],
crawlerQueue: IOQueue[CrawlerMessage]
): IO[Nothing, Fiber[Nothing, Unit]] = {
def handleUrl(url: Url): IO[Nothing, Unit] = {
http
.get(url)
.attempt[Nothing]
.map {
case Left(t) =>