Last active
June 11, 2017 21:15
-
-
Save shijinkui/6118560 to your computer and use it in GitHub Desktop.
akka worker demo Pi.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> | |
| */ | |
| package com.sohu.smc.worker; | |
| import akka.actor.ActorRef; | |
| import akka.actor.ActorSystem; | |
| import akka.actor.Props; | |
| import akka.actor.UntypedActor; | |
| import akka.actor.UntypedActorFactory; | |
| import akka.routing.RoundRobinRouter; | |
| import akka.util.Duration; | |
| import java.util.concurrent.TimeUnit; | |
| public class Pi { | |
| public static void main(String[] args) { | |
| Pi pi = new Pi(); | |
| pi.calculate(4, 10000, 10000); | |
| } | |
| public void calculate( | |
| final int nrOfWorkers, | |
| final int nrOfElements, | |
| final int nrOfMessages) { | |
| // Create an Akka system | |
| ActorSystem system = ActorSystem.create("PiSystem"); | |
| // create the result listener, which will print the result and shutdown the system | |
| final ActorRef listener = system.actorOf(new Props(Listener.class), "listener"); | |
| // create the master | |
| ActorRef master = system.actorOf(new Props(new UntypedActorFactory() { | |
| private static final long serialVersionUID = -6926377568173193329L; | |
| public UntypedActor create() { | |
| return new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener); | |
| } | |
| }), "master"); | |
| // start the calculation | |
| System.out.println("1. start calculate"); | |
| master.tell(new Calculate()); | |
| } | |
| //////////////////////////////////////////////////// | |
| static class Calculate { | |
| } | |
| static class Task { | |
| private final int start; | |
| private final int nrOfElements; | |
| public Task(int start, int nrOfElements) { | |
| this.start = start; | |
| this.nrOfElements = nrOfElements; | |
| } | |
| public int getStart() { | |
| return start; | |
| } | |
| public int getNrOfElements() { | |
| return nrOfElements; | |
| } | |
| } | |
| static class Result { | |
| private final double value; | |
| public Result(double value) { | |
| this.value = value; | |
| } | |
| public double getValue() { | |
| return value; | |
| } | |
| } | |
| static class PiApproximation { | |
| private final double pi; | |
| private final Duration duration; | |
| public PiApproximation(double pi, Duration duration) { | |
| this.pi = pi; | |
| this.duration = duration; | |
| } | |
| public double getPi() { | |
| return pi; | |
| } | |
| public Duration getDuration() { | |
| return duration; | |
| } | |
| } | |
| //接受任务处理, 处理完发送消息--> listener | |
| public static class Worker extends UntypedActor { | |
| private double calculatePiFor(int start, int nrOfElements) { | |
| double acc = 0.0; | |
| for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) { | |
| acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1); | |
| } | |
| return acc; | |
| } | |
| public void onReceive(Object message) { | |
| if (message instanceof Task) { | |
| Task task = (Task) message; | |
| System.out.println(Thread.currentThread().getName() + "||task: " + task.getStart() + "--" + task.getNrOfElements()); | |
| double result = calculatePiFor(task.getStart(), task.getNrOfElements()); | |
| System.out.println("3. send result to master"); | |
| getSender().tell(new Result(result), getSelf()); | |
| } else { | |
| unhandled(message); | |
| } | |
| } | |
| } | |
| public static class Master extends UntypedActor { | |
| private final int nrOfMessages; | |
| private final int nrOfElements; | |
| private double pi; | |
| private int nrOfResults; | |
| private final long start = System.currentTimeMillis(); | |
| private final ActorRef listener; | |
| private final ActorRef workerRouter; | |
| public Master( | |
| final int nrOfWorkers, | |
| int nrOfMessages, | |
| int nrOfElements, | |
| ActorRef listener) { | |
| this.nrOfMessages = nrOfMessages; | |
| this.nrOfElements = nrOfElements; | |
| this.listener = listener; | |
| workerRouter = this.getContext().actorOf(new Props(Worker.class).withRouter( | |
| new RoundRobinRouter(nrOfWorkers)), "workerRouter"); | |
| } | |
| public void onReceive(Object message) { | |
| if (message instanceof Calculate) { | |
| for (int start = 0; start < nrOfMessages; start++) { | |
| //send msg to worker | |
| workerRouter.tell(new Task(start, nrOfElements), getSelf()); | |
| System.out.println("2. send msg to worker"); | |
| } | |
| } else if (message instanceof Result) { | |
| System.out.println("4. master receive result, merge it."); | |
| Result result = (Result) message; | |
| pi += result.getValue(); | |
| nrOfResults += 1; | |
| if (nrOfResults == nrOfMessages) { | |
| // Send the result to the listener | |
| System.out.println("5. master send the merged result to listener"); | |
| Duration duration = Duration.create(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS); | |
| listener.tell(new PiApproximation(pi, duration), getSelf()); | |
| // Stops this actor and all its supervised children | |
| getContext().stop(getSelf()); | |
| } | |
| } else { | |
| unhandled(message); | |
| } | |
| } | |
| } | |
| public static class Listener extends UntypedActor { | |
| public void onReceive(Object message) { | |
| if (message instanceof PiApproximation) { | |
| PiApproximation approximation = (PiApproximation) message; | |
| System.out.println(String.format("\n\tPi 近似值: \t\t%s\n\tCalculation time: \t%s", | |
| approximation.getPi(), approximation.getDuration())); | |
| getContext().system().shutdown(); | |
| } else { | |
| unhandled(message); | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment