Skip to content

Instantly share code, notes, and snippets.

View adamw's full-sized avatar

Adam Warski adamw

View GitHub Profile
class ZioRateLimiter(queue: IOQueue[RateLimiterMsg],
runQueueFiber: Fiber[Nothing, Unit]) {
def runLimited[E, T](f: IO[E, T]): IO[E, T] = {
for {
p <- Promise.make[E, T]
toRun = f.flatMap(p.complete).catchAll[Nothing](p.error).fork[Nothing].toUnit
_ <- queue.offer[E](Schedule(toRun))
r <- p.get
} yield r
}
sealed trait RateLimiterTask[F]
case class Run[F](run: F) extends RateLimiterTask[F]
case class RunAfter[F](millis: Long) extends RateLimiterTask[F]
type Host = String
case class Url(host: Host, path: String)
trait Http[F[_]] {
def get(url: Url): F[String]
}
type LinkParser = String => List[Url]
class Crawler(http: Http[Future],
parseLinks: String => List[Url],
result: Promise[Map[Host, Int]]) extends Actor {
var referenceCount = Map[Host, Int]()
var visitedLinks = Set[Url]()
var inProgress = Set[Url]()
var workers = Map[Host, ActorRef]()
// ...
sealed trait CrawlerMessage
/**
* Start the crawling process for the given URL. Should be sent only once.
*/
case class Start(url: Url) extends CrawlerMessage
case class CrawlResult(url: Url, links: List[Url]) extends CrawlerMessage
override def receive: Receive = {
case Start(start) =>
crawlUrl(start)
case CrawlResult(url, links) =>
inProgress -= url
links.foreach { link =>
crawlUrl(link)
referenceCount = referenceCount.updated(link.host,
class Worker(http: Http[Future],
parseLinks: String => List[Url],
master: ActorRef) extends Actor with ActorLogging {
var urlsPending: Vector[Url] = Vector.empty
var getInProgress = false
// ...
}
sealed trait WorkerMessage
case class Crawl(url: Url) extends WorkerMessage
case class HttpGetResult(url: Url, result: Try[String]) extends WorkerMessage
sealed trait WorkerMessage
case class Crawl(url: Url) extends WorkerMessage
case class HttpGetResult(url: Url, result: Try[String]) extends WorkerMessage
override def receive: Receive = {
case Crawl(url) =>
urlsPending = urlsPending :+ url
startHttpGetIfPossible()
case HttpGetResult(url, Success(body)) =>
getInProgress = false
startHttpGetIfPossible()
val links = parseLinks(body)