Created
October 13, 2014 03:18
-
-
Save nicholasren/0c01a4e79b1658704d2f to your computer and use it in GitHub Desktop.
a small example of real time indexing and searching engine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import akka.actor._ | |
import scala.concurrent.{Future, ExecutionContext} | |
import scala.concurrent.duration._ | |
import ExecutionContext.Implicits.global | |
import java.io.{PrintWriter, FileOutputStream} | |
import rx.lang.scala.Observable | |
import rx.lang.scala.Observable._ | |
sealed trait IndexAction | |
case class Index(id: String) | |
case class ReIndex(id: String) | |
case class UnIndex(id: String) | |
class Indexer extends Actor { | |
override def receive: Actor.Receive = { | |
case UnIndex(id) => IndexService.unindex(id) | |
case ReIndex(id) => IndexService.reindex(id) | |
case Index(id) => IndexService.index(id) | |
} | |
} | |
object IndexService { | |
def unindex(id: String) = println(s"Unindexing $id") | |
def reindex(id: String) = println(s"Reindexing $id") | |
def index(id: String) = println(s"Indexing $id") | |
} | |
case class ListingAction(id: String, title: String, action: String) | |
class ListingExtractor extends Actor { | |
val indexer = context.system.actorOf(Props[Indexer]) | |
override def receive: Actor.Receive = { | |
case ids: List[String] => { | |
ids. | |
map(Service.fetchDetail(_)). | |
map( | |
_.map { | |
detail => | |
{ | |
detail.action match { | |
case "create" => indexer ! Index(detail.id) | |
case "delete" => indexer ! UnIndex(detail.id) | |
case "update" => indexer ! ReIndex(detail.id) | |
} | |
Service.persist(detail) | |
} | |
} | |
) | |
} | |
} | |
} | |
object Service { | |
val rnd = new scala.util.Random | |
val output = new PrintWriter(new FileOutputStream("/tmp/fetched-listings.txt"), true) | |
val actions = List("delete", "update", "create") | |
def randomDelay[A](f: => A) = { | |
val i = rnd.nextInt(10) | |
Thread.sleep(i * 100) | |
f | |
} | |
def fetchDetail(id: String): Future[ListingAction] = randomDelay { | |
Future { | |
val i = rnd.nextInt(3) | |
new ListingAction(id, s"listing-$id", actions(i)) | |
} | |
} | |
def persist(detail: ListingAction) = randomDelay { | |
output.println(detail.toString) | |
} | |
def pollChanges(i: Long): Future[List[String]] = randomDelay { | |
Future { | |
val d = rnd.nextInt(10) | |
List(s"1000$d$i", s"10000$d$i") | |
} | |
} | |
} | |
object Main extends App { | |
val system = ActorSystem("ActorSys") | |
val extractor = system.actorOf(Props[ListingExtractor], "extractor") | |
val changes: Observable[List[String]] = interval(1 seconds).map { | |
i => Service.pollChanges(i) | |
}.flatMap(from(_)) | |
changes.subscribe(ids => { | |
extractor ! ids | |
}) | |
} | |
//boot from microkernel | |
//class Boot extends Bootable { | |
// | |
// val system = ActorSystem("ActorSys") | |
// val extractor = system.actorOf(Props[ListingExtractor], "extractor") | |
// | |
// def startup = { | |
// | |
// val changes: Observable[List[String]] = interval(1 seconds).map { | |
// i => Service.pollChanges(i) | |
// }.flatMap(from(_)) | |
// | |
// | |
// changes.subscribe(ids => { | |
// extractor ! ids | |
// }) | |
// } | |
// | |
// def shutdown = { | |
// system.shutdown() | |
// } | |
//} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment