This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BroadcastActor(connector: QueueConnector[Future]) | |
extends Actor with ActorLogging { | |
private var consumers: Set[ActorRef] = Set() | |
override def receive: Receive = { | |
case Subscribe(actor) => consumers += actor | |
case Received(msg) => | |
consumers.foreach(_ ! msg) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BroadcastActor(connector: QueueConnector[Future]) | |
extends Actor with ActorLogging { | |
private var consumers: Set[ActorRef] = Set() | |
override def receive: Receive = { | |
case Subscribe(actor) => consumers += actor | |
case Received(msg) => | |
consumers.foreach(_ ! msg) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class Subscribe(actor: ActorRef) | |
case class Received(msg: String) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait Queue[F[_]] { | |
def read(): F[String] | |
def close(): F[Unit] | |
} | |
trait QueueConnector[F[_]] { | |
def connect: F[Queue[F]] | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
crawlerQueue.take.flatMap { msg => | |
handleMessage(msg, data).flatMap { data2 => | |
if (data2.inProgress.isEmpty) { | |
data2.workers.values.map(_.fiber.cancel).toList.sequence_ | |
.map(_ => data2.referenceCount) | |
} else { | |
crawler(crawlerQueue, data2) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def workerFor(data: CrawlerData, url: Host): Task[(CrawlerData, MQueue[Url])] = { | |
data.workers.get(url) match { | |
case None => | |
val workerQueue = MQueue.make[Url] | |
worker(workerQueue, crawlerQueue).map { workerFiber => | |
val workerData = WorkerData(workerQueue, workerFiber) | |
val data2 = data.copy(workers = data.workers + (url -> workerData)) | |
(data2, workerQueue) | |
} | |
case Some(wd) => Task.now((data, wd.queue)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class WorkerData(queue: MQueue[Url], | |
fiber: Fiber[Unit]) | |
case class CrawlerData(referenceCount: Map[Host, Int], | |
visitedLinks: Set[Url], | |
inProgress: Set[Url], | |
workers: Map[Host, WorkerData]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MQueue[T](q: AsyncQueue[T]) { | |
def take: Task[T] = { | |
Task.deferFuture(q.poll()) | |
} | |
def offer(t: T): Task[Unit] = { | |
Task.eval(q.offer(t)) | |
} | |
} | |
object MQueue { | |
def make[T]: MQueue[T] = new MQueue(AsyncQueue.empty) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val crawl = for { | |
crawlerQueue <- IOQueue.make[Nothing, CrawlerMessage](32) | |
_ <- crawlerQueue.offer[Nothing](Start(crawlUrl)) | |
r <- crawler(crawlerQueue, CrawlerData(Map(), Set(), Set(), Map())) | |
} yield r | |
IO.supervise(crawl, new RuntimeException) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def worker(workerQueue: IOQueue[Url], | |
crawlerQueue: IOQueue[CrawlerMessage] | |
): IO[Nothing, Fiber[Nothing, Unit]] = { | |
def handleUrl(url: Url): IO[Nothing, Unit] = { | |
http | |
.get(url) | |
.attempt[Nothing] | |
.map { | |
case Left(t) => |