Skip to content

Instantly share code, notes, and snippets.

@Groostav
Created August 1, 2014 01:49
Show Gist options
  • Save Groostav/6b69c4b1c5691cab703b to your computer and use it in GitHub Desktop.
Save Groostav/6b69c4b1c5691cab703b to your computer and use it in GitHub Desktop.
Suggested changes to Guava EventBus to allow annotation-specified concurrency
diff --git a/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java b/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java
index d324a38..48dfb78 100644
--- a/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java
+++ b/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java
@@ -16,10 +16,6 @@
package com.google.common.testing;
-import static com.google.common.base.Predicates.and;
-import static com.google.common.base.Predicates.not;
-import static com.google.common.testing.AbstractPackageSanityTests.Chopper.suffix;
-
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
@@ -33,10 +29,8 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.reflect.ClassPath;
import com.google.common.testing.NullPointerTester.Visibility;
-
import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
-
import org.junit.Test;
import java.io.IOException;
@@ -47,6 +41,10 @@ import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
+import static com.google.common.base.Predicates.and;
+import static com.google.common.base.Predicates.not;
+import static com.google.common.testing.AbstractPackageSanityTests.Chopper.suffix;
+
/**
* Automatically runs sanity checks against top level classes in the same package of the test that
* extends {@code AbstractPackageSanityTests}. Currently sanity checks include {@link
diff --git a/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java b/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java
index b0a332a..682e474 100644
--- a/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java
+++ b/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java
@@ -50,7 +50,7 @@ public class EventSubscriberTest extends TestCase {
public void testBasicMethodCall() throws Exception {
Method method = getRecordingMethod();
- EventSubscriber subscriber = new EventSubscriber(this, method);
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE);
subscriber.handleEvent(FIXTURE_ARGUMENT);
@@ -61,7 +61,7 @@ public class EventSubscriberTest extends TestCase {
public void testExceptionWrapping() {
Method method = getExceptionThrowingMethod();
- EventSubscriber subscriber = new EventSubscriber(this, method);
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE);
try {
subscriber.handleEvent(new Object());
@@ -74,7 +74,7 @@ public class EventSubscriberTest extends TestCase {
public void testErrorPassthrough() throws InvocationTargetException {
Method method = getErrorThrowingMethod();
- EventSubscriber subscriber = new EventSubscriber(this, method);
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE);
try {
subscriber.handleEvent(new Object());
@@ -89,13 +89,17 @@ public class EventSubscriberTest extends TestCase {
Method concat = String.class.getMethod("concat", String.class);
new EqualsTester()
.addEqualityGroup(
- new EventSubscriber("foo", charAt), new EventSubscriber("foo", charAt))
- .addEqualityGroup(new EventSubscriber("bar", charAt))
- .addEqualityGroup(new EventSubscriber("foo", concat))
+ makeSubscriber("foo", charAt), makeSubscriber("foo", charAt))
+ .addEqualityGroup(makeSubscriber("bar", charAt))
+ .addEqualityGroup(makeSubscriber("foo", concat))
.testEquals();
}
- /**
+ private EventSubscriber makeSubscriber(String target, Method method) {
+ return new EventSubscriber(target, method, Synchronizer.DEFAULT_INSTANCE);
+ }
+
+ /**
* Gets a reference to {@link #recordingMethod(Object)}.
*
* @return a Method wrapping {@link #recordingMethod(Object)}.
diff --git a/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java b/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java
new file mode 100644
index 0000000..24ba23e
--- /dev/null
+++ b/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java
@@ -0,0 +1,119 @@
+package com.google.common.eventbus;
+
+import com.sun.javafx.application.PlatformImpl;
+import javafx.application.Platform;
+import junit.framework.TestCase;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Created by Geoff on 2014-07-30.
+ */
+public class ExplicitThreadSupplierTest extends TestCase {
+
+ private boolean messageRecieved = false;
+ private Thread threadUsed = null;
+ private boolean wasRecievedOnJavaFXThread = false;
+
+ public static class FXThreadSynchronizer implements Synchronizer {
+
+ public static final Runnable DoNothing = new Runnable() { public void run() {} };
+
+ static {
+ PlatformImpl.startup(DoNothing);
+ }
+
+ @Override
+ public void synchronize(final Runnable actionToPerformSynchronously) {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Platform.runLater(new Runnable() {
+ @Override
+ public void run() {
+ actionToPerformSynchronously.run();
+ latch.countDown();
+ }
+ });
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public class FXSubscriber {
+ @Subscribe
+ @OnThread(FXThreadSynchronizer.class)
+ public void handle(String message) {
+ wasRecievedOnJavaFXThread = Platform.isFxApplicationThread();
+ messageRecieved = true;
+ }
+ }
+
+ public void testExplicitHandlingThread() {
+ //setup
+ EventBus eventBus = new EventBus();
+ eventBus.register(new FXSubscriber());
+
+ //act
+ eventBus.post("testing my cool new feature!");
+
+ //assert
+ assertTrue(messageRecieved);
+ assertTrue(wasRecievedOnJavaFXThread);
+ }
+
+
+ public class NoExplicitThreadSubscriber {
+ @Subscribe
+ public void handle(String message) {
+ threadUsed = Thread.currentThread();
+ messageRecieved = true;
+ }
+ }
+
+ public void testImplicitHandlingThread() {
+ //setup
+ EventBus eventBus = new EventBus();
+ eventBus.register(new NoExplicitThreadSubscriber());
+
+ //act
+ eventBus.post("testing the old features!");
+
+ //assert
+ assertTrue(messageRecieved);
+ assertEquals(threadUsed, Thread.currentThread());
+ }
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @OnThread(FXThreadSynchronizer.class)
+ public static @interface OnJavaFXThread {
+ }
+
+ public class ExplicitByCustomAnnotationSubscriber {
+ @Subscribe
+ @OnJavaFXThread
+ public void handle(String message) {
+ messageRecieved = true;
+ wasRecievedOnJavaFXThread = Platform.isFxApplicationThread();
+ }
+ }
+
+ public void testThreadImpliedByAnnotatedAnnotation() {
+ //setup
+ EventBus eventbus = new EventBus();
+ eventbus.register(new ExplicitByCustomAnnotationSubscriber());
+
+ //act
+ eventbus.post("testing a wierder version of my new feature which might just be jumping the shark!");
+
+ //assert
+ assertTrue(messageRecieved);
+ assertTrue(wasRecievedOnJavaFXThread);
+ }
+}
diff --git a/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java b/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java
index 5f984b0..48fbdcc 100644
--- a/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java
+++ b/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java
@@ -18,9 +18,8 @@ package com.google.common.eventbus;
import com.google.common.testing.AbstractPackageSanityTests;
-import java.lang.reflect.Method;
-
import javax.annotation.Nullable;
+import java.lang.reflect.Method;
/**
* Basic sanity tests for the entire package.
@@ -41,7 +40,7 @@ public class PackageSanityTests extends AbstractPackageSanityTests {
public void handle(@Nullable Object anything) {}
EventSubscriber toEventSubscriber() throws Exception {
- return new EventSubscriber(this, subscriberMethod());
+ return new EventSubscriber(this, subscriberMethod(), Synchronizer.DEFAULT_INSTANCE);
}
private static Method subscriberMethod() throws NoSuchMethodException {
diff --git a/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java b/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java
index b1c38ff..2bd34e2 100644
--- a/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java
+++ b/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java
@@ -28,14 +28,14 @@ import com.google.common.collect.Multimap;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.UncheckedExecutionException;
+import javax.annotation.Nullable;
+import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nullable;
-
/**
* A {@link SubscriberFindingStrategy} for collecting all event subscriber methods that are marked
* with the {@link Subscribe} annotation.
@@ -147,15 +147,62 @@ class AnnotatedSubscriberFinder implements SubscriberFindingStrategy {
*/
private static EventSubscriber makeSubscriber(Object listener, Method method) {
EventSubscriber wrapper;
+
+ Synchronizer threadSynchronizer = getThreadSynchronizer(method);
+
if (methodIsDeclaredThreadSafe(method)) {
- wrapper = new EventSubscriber(listener, method);
+ wrapper = new EventSubscriber(listener, method, threadSynchronizer);
} else {
- wrapper = new SynchronizedEventSubscriber(listener, method);
+ wrapper = new SynchronizedEventSubscriber(listener, method, threadSynchronizer);
}
+
return wrapper;
}
- /**
+ private static Synchronizer getThreadSynchronizer(Method method) {
+
+ OnThread methodAnnotation = method.getAnnotation(OnThread.class);
+ Annotation annotatedAnnotation = null;
+
+ for(Annotation annotation : method.getAnnotations()){
+
+ OnThread annotationFromAnnotation = annotation.annotationType().getAnnotation(OnThread.class);
+
+ if(annotationFromAnnotation != null && methodAnnotation != null){
+ throw new IllegalStateException(
+ "found an " + OnThread.class.getSimpleName() + " annotation " +
+ "on both '" + method + "' and '" + annotation.annotationType() + "'!");
+ }
+ if(annotationFromAnnotation != null && annotatedAnnotation != null){
+ throw new IllegalStateException(
+ "found an " + OnThread.class.getSimpleName() + " annotation " +
+ "on both '" + annotatedAnnotation.annotationType() + "' and '" + annotation.annotationType() + "'!");
+ }
+ if(annotationFromAnnotation != null){
+ annotatedAnnotation = annotation;
+ }
+ }
+
+ OnThread threadAnnotation = null;
+ if(methodAnnotation != null){
+ threadAnnotation = methodAnnotation;
+ }
+ else if(annotatedAnnotation != null){
+ threadAnnotation = annotatedAnnotation.annotationType().getAnnotation(OnThread.class);
+ }
+
+ Synchronizer threadSynchronizer;
+ if(threadAnnotation != null) try{
+ threadSynchronizer = threadAnnotation.value().newInstance();
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ } else {
+ threadSynchronizer = Synchronizer.DEFAULT_INSTANCE;
+ }
+ return threadSynchronizer;
+ }
+
+ /**
* Checks whether {@code method} is thread-safe, as indicated by the
* {@link AllowConcurrentEvents} annotation.
*
diff --git a/guava/src/com/google/common/eventbus/EventSubscriber.java b/guava/src/com/google/common/eventbus/EventSubscriber.java
index 058aeab..b8f023e 100644
--- a/guava/src/com/google/common/eventbus/EventSubscriber.java
+++ b/guava/src/com/google/common/eventbus/EventSubscriber.java
@@ -16,14 +16,16 @@
package com.google.common.eventbus;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Atomics;
+import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nullable;
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* Wraps a single-argument subscriber method on a specific object.
@@ -44,19 +46,23 @@ class EventSubscriber {
/** Subscriber method. */
private final Method method;
+ private final Synchronizer threadSynchronizer;
+
/**
* Creates a new EventSubscriber to wrap {@code method} on @{code target}.
*
* @param target object to which the method applies.
* @param method subscriber method.
*/
- EventSubscriber(Object target, Method method) {
- Preconditions.checkNotNull(target,
- "EventSubscriber target cannot be null.");
+ EventSubscriber(Object target, Method method, Synchronizer threadSynchronizer) {
+ Preconditions.checkNotNull(target, "EventSubscriber target cannot be null.");
Preconditions.checkNotNull(method, "EventSubscriber method cannot be null.");
+ Preconditions.checkNotNull(threadSynchronizer, "EventSubscriber threadSynchronizer cannot be null.");
this.target = target;
this.method = method;
+ this.threadSynchronizer = threadSynchronizer;
+
method.setAccessible(true);
}
@@ -68,20 +74,47 @@ class EventSubscriber {
* {@link Throwable} that is not an {@link Error} ({@code Error} instances are
* propagated as-is).
*/
- public void handleEvent(Object event) throws InvocationTargetException {
+ public void handleEvent(final Object event) throws InvocationTargetException {
checkNotNull(event);
- try {
- method.invoke(target, new Object[] { event });
- } catch (IllegalArgumentException e) {
- throw new Error("Method rejected target/argument: " + event, e);
- } catch (IllegalAccessException e) {
- throw new Error("Method became inaccessible: " + event, e);
- } catch (InvocationTargetException e) {
- if (e.getCause() instanceof Error) {
- throw (Error) e.getCause();
+
+ final AtomicBoolean finished = new AtomicBoolean(false);
+ final AtomicReference<Exception> problem = new AtomicReference<Exception>(null);
+
+ threadSynchronizer.synchronize(new Runnable() {
+ @Override
+ public void run() {
+ try{
+ method.invoke(target, new Object[] { event });
+ } catch (Exception e){
+ problem.set(e);
+ } finally {
+ finished.set(true);
+ }
+ }
+ });
+
+ if( ! finished.get()){
+ throw new IllegalStateException("synchronizer '" + threadSynchronizer + "' " +
+ "specified on the subscribing method '" + method + "' " +
+ "must block the calling thread until the provided action has completed, " +
+ "but it did not.");
+ }
+
+ Exception thrownException = problem.get();
+ if(thrownException != null){
+
+ if(thrownException instanceof IllegalArgumentException) {
+ throw new Error("Method rejected target/argument: " + event, thrownException);
+ } else if (thrownException instanceof IllegalAccessException) {
+ throw new Error("Method became inaccessible: " + event, thrownException);
+ } else if (thrownException instanceof InvocationTargetException) {
+ if (thrownException.getCause() instanceof Error) {
+ throw (Error) thrownException.getCause();
+ }
+ throw (InvocationTargetException) thrownException;
}
- throw e;
}
+
}
@Override public String toString() {
diff --git a/guava/src/com/google/common/eventbus/OnThread.java b/guava/src/com/google/common/eventbus/OnThread.java
new file mode 100644
index 0000000..9e59fdc
--- /dev/null
+++ b/guava/src/com/google/common/eventbus/OnThread.java
@@ -0,0 +1,16 @@
+package com.google.common.eventbus;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.*;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Created by Geoff on 2014-07-30.
+ */
+@Target({METHOD, ANNOTATION_TYPE})
+@Retention(RUNTIME)
+public @interface OnThread {
+ public Class<? extends Synchronizer> value() default Synchronizer.Default.class;
+}
diff --git a/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java b/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java
index a074818..66fb2e6 100644
--- a/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java
+++ b/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java
@@ -36,8 +36,8 @@ final class SynchronizedEventSubscriber extends EventSubscriber {
* @param target object to which the method applies.
* @param method subscriber method.
*/
- public SynchronizedEventSubscriber(Object target, Method method) {
- super(target, method);
+ public SynchronizedEventSubscriber(Object target, Method method, Synchronizer threadSynchronizer) {
+ super(target, method, threadSynchronizer);
}
@Override
diff --git a/guava/src/com/google/common/eventbus/Synchronizer.java b/guava/src/com/google/common/eventbus/Synchronizer.java
new file mode 100644
index 0000000..6306224
--- /dev/null
+++ b/guava/src/com/google/common/eventbus/Synchronizer.java
@@ -0,0 +1,18 @@
+package com.google.common.eventbus;
+
+/**
+* Created by Geoff on 2014-07-30.
+*/
+public interface Synchronizer {
+
+ public void synchronize(Runnable actionToPerformSynchronously);
+
+
+ public static class Default implements Synchronizer{
+ public void synchronize(Runnable actionToPerformSynchronously) {
+ actionToPerformSynchronously.run();
+ }
+ }
+
+ public static Default DEFAULT_INSTANCE = new Default();
+}
diff --git a/guava/src/com/google/common/reflect/Types.java b/guava/src/com/google/common/reflect/Types.java
index d6c56e8..a1dabfc 100644
--- a/guava/src/com/google/common/reflect/Types.java
+++ b/guava/src/com/google/common/reflect/Types.java
@@ -29,7 +29,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.io.Serializable;
+import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement;
+import java.lang.reflect.AnnotatedType;
import java.lang.reflect.Array;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.GenericDeclaration;
@@ -343,7 +345,12 @@ final class Types {
return name;
}
- @Override public String toString() {
+ @Override
+ public AnnotatedType[] getAnnotatedBounds() {
+ throw new UnsupportedOperationException("getAnnotatedBounds");
+ }
+
+ @Override public String toString() {
return name;
}
@@ -371,6 +378,21 @@ final class Types {
return false;
}
}
+
+ @Override
+ public <T extends Annotation> T getAnnotation(Class<T> annotationClass) {
+ throw new UnsupportedOperationException("getAnnotation");
+ }
+
+ @Override
+ public Annotation[] getAnnotations() {
+ throw new UnsupportedOperationException("getAnnotations");
+ }
+
+ @Override
+ public Annotation[] getDeclaredAnnotations() {
+ throw new UnsupportedOperationException("getDeclaredAnnotations");
+ }
}
static final class WildcardTypeImpl implements WildcardType, Serializable {
diff --git a/groostav-guava-plantation.diff b/groostav-guava-plantation.diff
new file mode 100644
index 0000000..a0cbaa0
Binary files /dev/null and b/groostav-guava-plantation.diff differ
diff --git a/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java b/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java
index d324a38..48dfb78 100644
--- a/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java
+++ b/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java
@@ -16,10 +16,6 @@
package com.google.common.testing;
-import static com.google.common.base.Predicates.and;
-import static com.google.common.base.Predicates.not;
-import static com.google.common.testing.AbstractPackageSanityTests.Chopper.suffix;
-
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
@@ -33,10 +29,8 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.reflect.ClassPath;
import com.google.common.testing.NullPointerTester.Visibility;
-
import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
-
import org.junit.Test;
import java.io.IOException;
@@ -47,6 +41,10 @@ import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
+import static com.google.common.base.Predicates.and;
+import static com.google.common.base.Predicates.not;
+import static com.google.common.testing.AbstractPackageSanityTests.Chopper.suffix;
+
/**
* Automatically runs sanity checks against top level classes in the same package of the test that
* extends {@code AbstractPackageSanityTests}. Currently sanity checks include {@link
diff --git a/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java b/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java
index b0a332a..682e474 100644
--- a/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java
+++ b/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java
@@ -50,7 +50,7 @@ public class EventSubscriberTest extends TestCase {
public void testBasicMethodCall() throws Exception {
Method method = getRecordingMethod();
- EventSubscriber subscriber = new EventSubscriber(this, method);
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE);
subscriber.handleEvent(FIXTURE_ARGUMENT);
@@ -61,7 +61,7 @@ public class EventSubscriberTest extends TestCase {
public void testExceptionWrapping() {
Method method = getExceptionThrowingMethod();
- EventSubscriber subscriber = new EventSubscriber(this, method);
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE);
try {
subscriber.handleEvent(new Object());
@@ -74,7 +74,7 @@ public class EventSubscriberTest extends TestCase {
public void testErrorPassthrough() throws InvocationTargetException {
Method method = getErrorThrowingMethod();
- EventSubscriber subscriber = new EventSubscriber(this, method);
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE);
try {
subscriber.handleEvent(new Object());
@@ -89,13 +89,17 @@ public class EventSubscriberTest extends TestCase {
Method concat = String.class.getMethod("concat", String.class);
new EqualsTester()
.addEqualityGroup(
- new EventSubscriber("foo", charAt), new EventSubscriber("foo", charAt))
- .addEqualityGroup(new EventSubscriber("bar", charAt))
- .addEqualityGroup(new EventSubscriber("foo", concat))
+ makeSubscriber("foo", charAt), makeSubscriber("foo", charAt))
+ .addEqualityGroup(makeSubscriber("bar", charAt))
+ .addEqualityGroup(makeSubscriber("foo", concat))
.testEquals();
}
- /**
+ private EventSubscriber makeSubscriber(String target, Method method) {
+ return new EventSubscriber(target, method, Synchronizer.DEFAULT_INSTANCE);
+ }
+
+ /**
* Gets a reference to {@link #recordingMethod(Object)}.
*
* @return a Method wrapping {@link #recordingMethod(Object)}.
diff --git a/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java b/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java
new file mode 100644
index 0000000..24ba23e
--- /dev/null
+++ b/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java
@@ -0,0 +1,119 @@
+package com.google.common.eventbus;
+
+import com.sun.javafx.application.PlatformImpl;
+import javafx.application.Platform;
+import junit.framework.TestCase;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Created by Geoff on 2014-07-30.
+ */
+public class ExplicitThreadSupplierTest extends TestCase {
+
+ private boolean messageRecieved = false;
+ private Thread threadUsed = null;
+ private boolean wasRecievedOnJavaFXThread = false;
+
+ public static class FXThreadSynchronizer implements Synchronizer {
+
+ public static final Runnable DoNothing = new Runnable() { public void run() {} };
+
+ static {
+ PlatformImpl.startup(DoNothing);
+ }
+
+ @Override
+ public void synchronize(final Runnable actionToPerformSynchronously) {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Platform.runLater(new Runnable() {
+ @Override
+ public void run() {
+ actionToPerformSynchronously.run();
+ latch.countDown();
+ }
+ });
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public class FXSubscriber {
+ @Subscribe
+ @OnThread(FXThreadSynchronizer.class)
+ public void handle(String message) {
+ wasRecievedOnJavaFXThread = Platform.isFxApplicationThread();
+ messageRecieved = true;
+ }
+ }
+
+ public void testExplicitHandlingThread() {
+ //setup
+ EventBus eventBus = new EventBus();
+ eventBus.register(new FXSubscriber());
+
+ //act
+ eventBus.post("testing my cool new feature!");
+
+ //assert
+ assertTrue(messageRecieved);
+ assertTrue(wasRecievedOnJavaFXThread);
+ }
+
+
+ public class NoExplicitThreadSubscriber {
+ @Subscribe
+ public void handle(String message) {
+ threadUsed = Thread.currentThread();
+ messageRecieved = true;
+ }
+ }
+
+ public void testImplicitHandlingThread() {
+ //setup
+ EventBus eventBus = new EventBus();
+ eventBus.register(new NoExplicitThreadSubscriber());
+
+ //act
+ eventBus.post("testing the old features!");
+
+ //assert
+ assertTrue(messageRecieved);
+ assertEquals(threadUsed, Thread.currentThread());
+ }
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @OnThread(FXThreadSynchronizer.class)
+ public static @interface OnJavaFXThread {
+ }
+
+ public class ExplicitByCustomAnnotationSubscriber {
+ @Subscribe
+ @OnJavaFXThread
+ public void handle(String message) {
+ messageRecieved = true;
+ wasRecievedOnJavaFXThread = Platform.isFxApplicationThread();
+ }
+ }
+
+ public void testThreadImpliedByAnnotatedAnnotation() {
+ //setup
+ EventBus eventbus = new EventBus();
+ eventbus.register(new ExplicitByCustomAnnotationSubscriber());
+
+ //act
+ eventbus.post("testing a wierder version of my new feature which might just be jumping the shark!");
+
+ //assert
+ assertTrue(messageRecieved);
+ assertTrue(wasRecievedOnJavaFXThread);
+ }
+}
diff --git a/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java b/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java
index 5f984b0..48fbdcc 100644
--- a/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java
+++ b/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java
@@ -18,9 +18,8 @@ package com.google.common.eventbus;
import com.google.common.testing.AbstractPackageSanityTests;
-import java.lang.reflect.Method;
-
import javax.annotation.Nullable;
+import java.lang.reflect.Method;
/**
* Basic sanity tests for the entire package.
@@ -41,7 +40,7 @@ public class PackageSanityTests extends AbstractPackageSanityTests {
public void handle(@Nullable Object anything) {}
EventSubscriber toEventSubscriber() throws Exception {
- return new EventSubscriber(this, subscriberMethod());
+ return new EventSubscriber(this, subscriberMethod(), Synchronizer.DEFAULT_INSTANCE);
}
private static Method subscriberMethod() throws NoSuchMethodException {
diff --git a/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java b/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java
index b1c38ff..aa2fbb9 100644
--- a/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java
+++ b/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java
@@ -28,14 +28,14 @@ import com.google.common.collect.Multimap;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.UncheckedExecutionException;
+import javax.annotation.Nullable;
+import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nullable;
-
/**
* A {@link SubscriberFindingStrategy} for collecting all event subscriber methods that are marked
* with the {@link Subscribe} annotation.
@@ -147,15 +147,62 @@ class AnnotatedSubscriberFinder implements SubscriberFindingStrategy {
*/
private static EventSubscriber makeSubscriber(Object listener, Method method) {
EventSubscriber wrapper;
+
+ Synchronizer threadSynchronizer = getThreadSynchronizer(method);
+
if (methodIsDeclaredThreadSafe(method)) {
- wrapper = new EventSubscriber(listener, method);
+ wrapper = new EventSubscriber(listener, method, threadSynchronizer);
} else {
- wrapper = new SynchronizedEventSubscriber(listener, method);
+ wrapper = new SynchronizedEventSubscriber(listener, method, threadSynchronizer);
}
+
return wrapper;
}
- /**
+ private static Synchronizer getThreadSynchronizer(Method method) {
+
+ OnThread methodAnnotation = method.getAnnotation(OnThread.class);
+ Annotation annotatedAnnotation = null;
+
+ for(Annotation annotation : method.getAnnotations()){
+
+ OnThread annotationFromAnnotation = annotation.annotationType().getAnnotation(OnThread.class);
+
+ if(annotationFromAnnotation != null && methodAnnotation != null){
+ throw new IllegalStateException(
+ "found an " + OnThread.class.getSimpleName() + " annotation " +
+ "on both '" + method + "' and '" + annotation.annotationType() + "'!");
+ }
+ if(annotationFromAnnotation != null && annotatedAnnotation != null){
+ throw new IllegalStateException(
+ "found an " + OnThread.class.getSimpleName() + " annotation " +
+ "on both '" + annotatedAnnotation.annotationType() + "' and '" + annotation.annotationType() + "'!");
+ }
+ if(annotationFromAnnotation != null){
+ annotatedAnnotation = annotation;
+ }
+ }
+
+ OnThread threadAnnotation = null;
+ if(methodAnnotation != null){
+ threadAnnotation = methodAnnotation;
+ }
+ else if(annotatedAnnotation != null){
+ threadAnnotation = annotatedAnnotation.annotationType().getAnnotation(OnThread.class);
+ }
+
+ Synchronizer threadSynchronizer;
+ if(threadAnnotation != null) try{
+ threadSynchronizer = threadAnnotation.value().newInstance();
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ } else {
+ threadSynchronizer = Synchronizer.DEFAULT_INSTANCE;
+ }
+ return threadSynchronizer;
+ }
+
+ /**
* Checks whether {@code method} is thread-safe, as indicated by the
* {@link AllowConcurrentEvents} annotation.
*
diff --git a/guava/src/com/google/common/eventbus/EventSubscriber.java b/guava/src/com/google/common/eventbus/EventSubscriber.java
index 058aeab..3f2cb4f 100644
--- a/guava/src/com/google/common/eventbus/EventSubscriber.java
+++ b/guava/src/com/google/common/eventbus/EventSubscriber.java
@@ -16,14 +16,16 @@
package com.google.common.eventbus;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Atomics;
+import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nullable;
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* Wraps a single-argument subscriber method on a specific object.
@@ -44,19 +46,23 @@ class EventSubscriber {
/** Subscriber method. */
private final Method method;
+ private final Synchronizer threadSynchronizer;
+
/**
* Creates a new EventSubscriber to wrap {@code method} on @{code target}.
*
* @param target object to which the method applies.
* @param method subscriber method.
*/
- EventSubscriber(Object target, Method method) {
- Preconditions.checkNotNull(target,
- "EventSubscriber target cannot be null.");
+ EventSubscriber(Object target, Method method, Synchronizer threadSynchronizer) {
+ Preconditions.checkNotNull(target, "EventSubscriber target cannot be null.");
Preconditions.checkNotNull(method, "EventSubscriber method cannot be null.");
+ Preconditions.checkNotNull(threadSynchronizer, "EventSubscriber threadSynchronizer cannot be null.");
this.target = target;
this.method = method;
+ this.threadSynchronizer = threadSynchronizer;
+
method.setAccessible(true);
}
@@ -68,20 +74,47 @@ class EventSubscriber {
* {@link Throwable} that is not an {@link Error} ({@code Error} instances are
* propagated as-is).
*/
- public void handleEvent(Object event) throws InvocationTargetException {
+ public void handleEvent(final Object event) throws InvocationTargetException {
checkNotNull(event);
- try {
- method.invoke(target, new Object[] { event });
- } catch (IllegalArgumentException e) {
- throw new Error("Method rejected target/argument: " + event, e);
- } catch (IllegalAccessException e) {
- throw new Error("Method became inaccessible: " + event, e);
- } catch (InvocationTargetException e) {
- if (e.getCause() instanceof Error) {
- throw (Error) e.getCause();
+
+ final AtomicBoolean finished = new AtomicBoolean(false);
+ final AtomicReference<Exception> problem = new AtomicReference<Exception>(null);
+
+ threadSynchronizer.synchronize(new Runnable() {
+ @Override
+ public void run() {
+ try{
+ method.invoke(target, new Object[] { event });
+ } catch (Exception e){
+ problem.set(e);
+ } finally {
+ finished.set(true);
+ }
}
- throw e;
+ });
+
+ if( ! finished.get()){
+ throw new IllegalStateException("synchronizer '" + threadSynchronizer + "' " +
+ "specified on the subscribing method '" + method + "' " +
+ "must block the calling thread until the provided action has completed, " +
+ "but it did not.");
}
+
+ Exception thrownException = problem.get();
+ if(thrownException != null){
+
+ if(thrownException instanceof IllegalArgumentException) {
+ throw new Error("Method rejected target/argument: " + event, thrownException);
+ } else if (thrownException instanceof IllegalAccessException) {
+ throw new Error("Method became inaccessible: " + event, thrownException);
+ } else if (thrownException instanceof InvocationTargetException) {
+ if (thrownException.getCause() instanceof Error) {
+ throw (Error) thrownException.getCause();
+ }
+ throw (InvocationTargetException) thrownException;
+ }
+ }
+
}
@Override public String toString() {
diff --git a/guava/src/com/google/common/eventbus/OnThread.java b/guava/src/com/google/common/eventbus/OnThread.java
new file mode 100644
index 0000000..86f0a45
--- /dev/null
+++ b/guava/src/com/google/common/eventbus/OnThread.java
@@ -0,0 +1,51 @@
+package com.google.common.eventbus;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.*;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Annotation to allow execution of handlers on threads other than the one the poster is on.
+ *
+ * <P>Subscribers that are annotation with this annotation in addition to the {@link Subscribe}
+ * annotation will have be executed with the specified {@link com.google.common.eventbus.Synchronizer}.</P>
+ *
+ * <P>For example, consider an event handler that will modify Swing view code. To respect
+ * UI threading, any code that modifies view fields must be wrapped in a call to
+ * {@link javax.swing.SwingUtilities#invokeAndWait(Runnable)} or
+ * {@link javax.swing.SwingUtilities#invokeLater(Runnable)}. This requires a consistent amount
+ * of confusing boiler plate code, which is an example of the problems this annotation was created to
+ * address.</P>
+ *
+ * <P>consider the Synchronizer
+ * <pre>{@code
+ * class SwingThreadSynchronizer implements Synchronizer{
+ * @literal@Override
+ * public void synchronize(Runnable runnable){
+ * SwingUtilities.invokeAndWait(runnable);
+ * }
+ * }
+ * }</pre>
+ * with this class, to solve the above problem a controller that modifies its view
+ * should add this annotation to its subscribers:
+ * <pre>{@code
+ * @literal@Subscribe
+ * @literal@OnThread(SwingThreadSynchronizer.class)
+ * public void onViewRelaventModelChange(YourModelEvent event){
+ * view.information.setText(derriveTextFromModel());
+ * //... other code that would cause concurrency issues if not run on an Swing thread.
+ * }
+ * }</pre>
+ * </P>
+ *
+ * @see com.google.common.eventbus.Synchronizer
+ *
+ * Created by Geoff Groos ([email protected]) on 2014-07-30.
+ */
+@Target({METHOD, ANNOTATION_TYPE})
+@Retention(RUNTIME)
+public @interface OnThread {
+ public Class<? extends Synchronizer> value() default Synchronizer.Default.class;
+}
diff --git a/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java b/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java
index a074818..66fb2e6 100644
--- a/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java
+++ b/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java
@@ -36,8 +36,8 @@ final class SynchronizedEventSubscriber extends EventSubscriber {
* @param target object to which the method applies.
* @param method subscriber method.
*/
- public SynchronizedEventSubscriber(Object target, Method method) {
- super(target, method);
+ public SynchronizedEventSubscriber(Object target, Method method, Synchronizer threadSynchronizer) {
+ super(target, method, threadSynchronizer);
}
@Override
diff --git a/guava/src/com/google/common/eventbus/Synchronizer.java b/guava/src/com/google/common/eventbus/Synchronizer.java
new file mode 100644
index 0000000..e3f1919
--- /dev/null
+++ b/guava/src/com/google/common/eventbus/Synchronizer.java
@@ -0,0 +1,16 @@
+package com.google.common.eventbus;
+
+/**
+ * Created by Geoff Groos ([email protected]) on 2014-07-30.
+ */
+public interface Synchronizer {
+
+ public void synchronize(Runnable actionToPerformSynchronously);
+
+ public static Default DEFAULT_INSTANCE = new Default();
+ public static class Default implements Synchronizer {
+ public void synchronize(Runnable actionToPerformSynchronously) {
+ actionToPerformSynchronously.run();
+ }
+ }
+}
diff --git a/guava/src/com/google/common/reflect/Types.java b/guava/src/com/google/common/reflect/Types.java
index d6c56e8..cdd4ffb 100644
--- a/guava/src/com/google/common/reflect/Types.java
+++ b/guava/src/com/google/common/reflect/Types.java
@@ -29,7 +29,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.io.Serializable;
+import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement;
+import java.lang.reflect.AnnotatedType;
import java.lang.reflect.Array;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.GenericDeclaration;
@@ -343,9 +345,12 @@ final class Types {
return name;
}
+ @Override
+ public AnnotatedType[] getAnnotatedBounds() { throw new UnsupportedOperationException("getAnnotatedBounds");}
+
@Override public String toString() {
return name;
- }
+}
@Override public int hashCode() {
return genericDeclaration.hashCode() ^ name.hashCode();
@@ -371,6 +376,21 @@ final class Types {
return false;
}
}
+
+ @Override
+ public <T extends Annotation> T getAnnotation(Class<T> annotationClass) {
+ throw new UnsupportedOperationException("getAnnotation");
+ }
+
+ @Override
+ public Annotation[] getAnnotations() {
+ throw new UnsupportedOperationException("getAnnotations");
+ }
+
+ @Override
+ public Annotation[] getDeclaredAnnotations() {
+ throw new UnsupportedOperationException("getDeclaredAnnotations");
+ }
}
static final class WildcardTypeImpl implements WildcardType, Serializable {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment