Created
April 3, 2012 14:44
-
-
Save fehmicansaglam/2292578 to your computer and use it in GitHub Desktop.
Akka ya da Java ExecutorService ve Hazelcast ile kullanıcı bildirim kuyruğu
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Producer extends UntypedActor { | |
private static final Gson gson = new GsonBuilder().create(); | |
private final String[] keys = {"user1", "user2", "user3", "user4", "user5", "user6"}; | |
@Override | |
public void onReceive(Object message) throws Exception { | |
if (message instanceof Integer) { | |
Integer opCount = (Integer) message; | |
final Random random = new Random(); | |
final MultiMap<String, String> notificationMap = HazelcastManager.getNotificationMap(); | |
final IMap<String, String> notificationMetadataMap = HazelcastManager.getNotificationMetadataMap(); | |
for (int i = 0; i < opCount; ++i) { | |
final String key = keys[random.nextInt(keys.length)]; | |
final String text = UUID.randomUUID().toString(); | |
final long timestamp = System.currentTimeMillis(); | |
final Notification notification = new Notification(timestamp, key, text); | |
notificationMap.lock(key); | |
notificationMetadataMap.lock(key); | |
notificationMap.put(key, gson.toJson(notification)); | |
QueueMetadata metadata = gson.fromJson(notificationMetadataMap.get(key), QueueMetadata.class); | |
if (metadata == null) { | |
metadata = new QueueMetadata(1, timestamp); | |
} else { | |
metadata = metadata.increment(); | |
} | |
notificationMetadataMap.put(key, gson.toJson(metadata)); | |
notificationMetadataMap.unlock(key); | |
notificationMap.unlock(key); | |
} | |
getSender().tell("finished", getSelf()); | |
} else { | |
unhandled(message); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public final class Producer extends LatchWorker { | |
private final int opCount; | |
private final String[] keys = {"user1", "user2", "user3", "user4", "user5", "user6"}; | |
private final Gson gson = new GsonBuilder().create(); | |
public Producer(int opCount, CountDownLatch startGate, CountDownLatch endGate, int id) { | |
super(startGate, endGate, id); | |
this.opCount = opCount; | |
} | |
@Override | |
void doJob() { | |
final MultiMap<String, String> notificationMap = HazelcastManager.getNotificationMap(); | |
final IMap<String, String> notificationMetadataMap = HazelcastManager.getNotificationMetadataMap(); | |
final Random random = new Random(); | |
for (int i = 0; i < opCount; ++i) { | |
final String key = keys[random.nextInt(keys.length)]; | |
notificationMap.lock(key); | |
notificationMetadataMap.lock(key); | |
final String text = UUID.randomUUID().toString(); | |
final long timestamp = System.currentTimeMillis(); | |
final Notification notification = new Notification(timestamp, key, text); | |
notificationMap.put(key, gson.toJson(notification)); | |
QueueMetadata metadata = gson.fromJson(notificationMetadataMap.get(key), QueueMetadata.class); | |
if (metadata == null) { | |
metadata = new QueueMetadata(1, timestamp); | |
} else { | |
metadata = (QueueMetadata) metadata.increment(); | |
} | |
notificationMetadataMap.put(key, gson.toJson(metadata)); | |
notificationMetadataMap.unlock(key); | |
notificationMap.unlock(key); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public final class HazelcastManager { | |
private static final HazelcastInstance client; | |
private static final String NOTIFICATION_MAP = "notification-map"; | |
private static final String NOTIFICATION_METADATA_MAP = "notification-metadata-map"; | |
static { | |
ClientConfig clientConfig = new ClientConfig(); | |
clientConfig.getGroupConfig().setName("dev").setPassword("dev-pass"); | |
clientConfig.addAddress("10.35.1.43:5701"); | |
client = HazelcastClient.newHazelcastClient(clientConfig); | |
getNotificationMap().clear(); | |
getNotificationMetadataMap().clear(); | |
} | |
private HazelcastManager() { | |
} | |
public static MultiMap<String, String> getNotificationMap() { | |
return client.getMultiMap(NOTIFICATION_MAP); | |
} | |
public static IMap<String, String> getNotificationMetadataMap() { | |
return client.getMap(NOTIFICATION_METADATA_MAP); | |
} | |
public static void shutdown(){ | |
client.getLifecycleService().shutdown(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public abstract class LatchWorker implements Runnable { | |
protected final int id; | |
private final CountDownLatch startGate; | |
private final CountDownLatch endGate; | |
public LatchWorker(CountDownLatch startGate, CountDownLatch endGate, int id) { | |
this.id = id; | |
this.startGate = startGate; | |
this.endGate = endGate; | |
} | |
@Override | |
public void run() { | |
System.out.println("Thread[" + this.id + "]: waiting for latch"); | |
try { | |
startGate.await(); | |
System.out.println("Thread[" + this.id + "]: started"); | |
doJob(); | |
System.out.println("Thread[" + this.id + "] completed"); | |
endGate.countDown(); | |
} catch (InterruptedException ex) { | |
} | |
} | |
abstract void doJob(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Main { | |
private static final int threadCount = 8; | |
private static final int opCount = 16384 / threadCount; | |
private static void runJava() throws InterruptedException { | |
final CountDownLatch startGate = new CountDownLatch(1); | |
final CountDownLatch endGate = new CountDownLatch(threadCount); | |
final ExecutorService executor = Executors.newFixedThreadPool(threadCount + 1); | |
for (int i = 0; i < threadCount; ++i) { | |
executor.execute(new workers.java.Producer(opCount, startGate, endGate, i)); | |
} | |
long start = System.nanoTime(); | |
startGate.countDown(); | |
endGate.await(); | |
long end = System.nanoTime(); | |
System.out.println("Java Total time: " + (end - start) / 1000); | |
executor.shutdownNow(); | |
} | |
private static void runAkka() { | |
ActorSystem system = ActorSystem.create("NotificationSystem"); | |
ActorRef master = system.actorOf(new Props(new UntypedActorFactory() { | |
@Override | |
public UntypedActor create() { | |
return new Master(threadCount, opCount); | |
} | |
}), "producer"); | |
master.tell("start"); | |
} | |
public static void main(String[] args) throws InterruptedException { | |
HazelcastManager.getNotificationMap(); | |
Thread.sleep(1000); | |
runJava(); | |
runAkka(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Master extends UntypedActor { | |
private final ActorRef producerRouter; | |
private final int nrOfProducers; | |
private final int opCount; | |
private int nrOfResults; | |
private long start; | |
public Master(final int nrOfProducers, final int opCount) { | |
this.nrOfProducers = nrOfProducers; | |
this.opCount = opCount; | |
this.producerRouter = this.getContext().actorOf(new Props(Producer.class).withRouter(new RoundRobinRouter(nrOfProducers)), | |
"producerRouter"); | |
} | |
@Override | |
public void onReceive(Object message) throws Exception { | |
if (message instanceof String) { | |
if ("start".equals(message)) { | |
start = System.nanoTime(); | |
nrOfResults = 0; | |
for (int i = 0; i < this.nrOfProducers; i++) { | |
producerRouter.tell(this.opCount, getSelf()); | |
} | |
} else if ("finished".equals(message)) { | |
nrOfResults++; | |
if (nrOfResults == nrOfProducers) { | |
long end = System.nanoTime(); | |
System.out.println("Akka Total time: " + (end - start) / 1000); | |
getContext().system().shutdown(); | |
HazelcastManager.shutdown(); | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public final class Notification implements Serializable { | |
public final long timestamp; //bildirimin oluşturulma zamanı | |
public final String user; //ilgili kullanıcı adı | |
public final String text; //bildirim mesajı | |
public Notification(final long timestamp, final String user, final String text) { | |
this.timestamp = timestamp; | |
this.user = user; | |
this.text = text; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public final class QueueMetadata implements Serializable { | |
public final int entryCount; //bildirim kuyruğu eleman sayısı | |
public final long oldestEntryTimestamp; //kuyruktaki en eski elemanın oluşturulma zamanı | |
public QueueMetadata(int entryCount, long oldestEntryTimestamp) { | |
this.entryCount = entryCount; | |
this.oldestEntryTimestamp = oldestEntryTimestamp; | |
} | |
public QueueMetadata increment() { | |
return new QueueMetadata(entryCount + 1, oldestEntryTimestamp); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment