Created
August 1, 2014 01:49
-
-
Save Groostav/6b69c4b1c5691cab703b to your computer and use it in GitHub Desktop.
Suggested changes to Guava EventBus to allow annotation-specified concurrency
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java b/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java | |
index d324a38..48dfb78 100644 | |
--- a/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java | |
+++ b/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java | |
@@ -16,10 +16,6 @@ | |
package com.google.common.testing; | |
-import static com.google.common.base.Predicates.and; | |
-import static com.google.common.base.Predicates.not; | |
-import static com.google.common.testing.AbstractPackageSanityTests.Chopper.suffix; | |
- | |
import com.google.common.annotations.Beta; | |
import com.google.common.annotations.VisibleForTesting; | |
import com.google.common.base.Optional; | |
@@ -33,10 +29,8 @@ import com.google.common.collect.Multimap; | |
import com.google.common.collect.Sets; | |
import com.google.common.reflect.ClassPath; | |
import com.google.common.testing.NullPointerTester.Visibility; | |
- | |
import junit.framework.AssertionFailedError; | |
import junit.framework.TestCase; | |
- | |
import org.junit.Test; | |
import java.io.IOException; | |
@@ -47,6 +41,10 @@ import java.util.TreeMap; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
+import static com.google.common.base.Predicates.and; | |
+import static com.google.common.base.Predicates.not; | |
+import static com.google.common.testing.AbstractPackageSanityTests.Chopper.suffix; | |
+ | |
/** | |
* Automatically runs sanity checks against top level classes in the same package of the test that | |
* extends {@code AbstractPackageSanityTests}. Currently sanity checks include {@link | |
diff --git a/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java b/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java | |
index b0a332a..682e474 100644 | |
--- a/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java | |
+++ b/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java | |
@@ -50,7 +50,7 @@ public class EventSubscriberTest extends TestCase { | |
public void testBasicMethodCall() throws Exception { | |
Method method = getRecordingMethod(); | |
- EventSubscriber subscriber = new EventSubscriber(this, method); | |
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE); | |
subscriber.handleEvent(FIXTURE_ARGUMENT); | |
@@ -61,7 +61,7 @@ public class EventSubscriberTest extends TestCase { | |
public void testExceptionWrapping() { | |
Method method = getExceptionThrowingMethod(); | |
- EventSubscriber subscriber = new EventSubscriber(this, method); | |
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE); | |
try { | |
subscriber.handleEvent(new Object()); | |
@@ -74,7 +74,7 @@ public class EventSubscriberTest extends TestCase { | |
public void testErrorPassthrough() throws InvocationTargetException { | |
Method method = getErrorThrowingMethod(); | |
- EventSubscriber subscriber = new EventSubscriber(this, method); | |
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE); | |
try { | |
subscriber.handleEvent(new Object()); | |
@@ -89,13 +89,17 @@ public class EventSubscriberTest extends TestCase { | |
Method concat = String.class.getMethod("concat", String.class); | |
new EqualsTester() | |
.addEqualityGroup( | |
- new EventSubscriber("foo", charAt), new EventSubscriber("foo", charAt)) | |
- .addEqualityGroup(new EventSubscriber("bar", charAt)) | |
- .addEqualityGroup(new EventSubscriber("foo", concat)) | |
+ makeSubscriber("foo", charAt), makeSubscriber("foo", charAt)) | |
+ .addEqualityGroup(makeSubscriber("bar", charAt)) | |
+ .addEqualityGroup(makeSubscriber("foo", concat)) | |
.testEquals(); | |
} | |
- /** | |
+ private EventSubscriber makeSubscriber(String target, Method method) { | |
+ return new EventSubscriber(target, method, Synchronizer.DEFAULT_INSTANCE); | |
+ } | |
+ | |
+ /** | |
* Gets a reference to {@link #recordingMethod(Object)}. | |
* | |
* @return a Method wrapping {@link #recordingMethod(Object)}. | |
diff --git a/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java b/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java | |
new file mode 100644 | |
index 0000000..24ba23e | |
--- /dev/null | |
+++ b/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java | |
@@ -0,0 +1,119 @@ | |
+package com.google.common.eventbus; | |
+ | |
+import com.sun.javafx.application.PlatformImpl; | |
+import javafx.application.Platform; | |
+import junit.framework.TestCase; | |
+ | |
+import java.lang.annotation.Retention; | |
+import java.lang.annotation.RetentionPolicy; | |
+import java.util.concurrent.CountDownLatch; | |
+ | |
+/** | |
+ * Created by Geoff on 2014-07-30. | |
+ */ | |
+public class ExplicitThreadSupplierTest extends TestCase { | |
+ | |
+ private boolean messageRecieved = false; | |
+ private Thread threadUsed = null; | |
+ private boolean wasRecievedOnJavaFXThread = false; | |
+ | |
+ public static class FXThreadSynchronizer implements Synchronizer { | |
+ | |
+ public static final Runnable DoNothing = new Runnable() { public void run() {} }; | |
+ | |
+ static { | |
+ PlatformImpl.startup(DoNothing); | |
+ } | |
+ | |
+ @Override | |
+ public void synchronize(final Runnable actionToPerformSynchronously) { | |
+ | |
+ final CountDownLatch latch = new CountDownLatch(1); | |
+ | |
+ Platform.runLater(new Runnable() { | |
+ @Override | |
+ public void run() { | |
+ actionToPerformSynchronously.run(); | |
+ latch.countDown(); | |
+ } | |
+ }); | |
+ | |
+ try { | |
+ latch.await(); | |
+ } catch (InterruptedException e) { | |
+ throw new RuntimeException(e); | |
+ } | |
+ } | |
+ } | |
+ | |
+ public class FXSubscriber { | |
+ @Subscribe | |
+ @OnThread(FXThreadSynchronizer.class) | |
+ public void handle(String message) { | |
+ wasRecievedOnJavaFXThread = Platform.isFxApplicationThread(); | |
+ messageRecieved = true; | |
+ } | |
+ } | |
+ | |
+ public void testExplicitHandlingThread() { | |
+ //setup | |
+ EventBus eventBus = new EventBus(); | |
+ eventBus.register(new FXSubscriber()); | |
+ | |
+ //act | |
+ eventBus.post("testing my cool new feature!"); | |
+ | |
+ //assert | |
+ assertTrue(messageRecieved); | |
+ assertTrue(wasRecievedOnJavaFXThread); | |
+ } | |
+ | |
+ | |
+ public class NoExplicitThreadSubscriber { | |
+ @Subscribe | |
+ public void handle(String message) { | |
+ threadUsed = Thread.currentThread(); | |
+ messageRecieved = true; | |
+ } | |
+ } | |
+ | |
+ public void testImplicitHandlingThread() { | |
+ //setup | |
+ EventBus eventBus = new EventBus(); | |
+ eventBus.register(new NoExplicitThreadSubscriber()); | |
+ | |
+ //act | |
+ eventBus.post("testing the old features!"); | |
+ | |
+ //assert | |
+ assertTrue(messageRecieved); | |
+ assertEquals(threadUsed, Thread.currentThread()); | |
+ } | |
+ | |
+ @Retention(RetentionPolicy.RUNTIME) | |
+ @OnThread(FXThreadSynchronizer.class) | |
+ public static @interface OnJavaFXThread { | |
+ } | |
+ | |
+ public class ExplicitByCustomAnnotationSubscriber { | |
+ @Subscribe | |
+ @OnJavaFXThread | |
+ public void handle(String message) { | |
+ messageRecieved = true; | |
+ wasRecievedOnJavaFXThread = Platform.isFxApplicationThread(); | |
+ } | |
+ } | |
+ | |
+ public void testThreadImpliedByAnnotatedAnnotation() { | |
+ //setup | |
+ EventBus eventbus = new EventBus(); | |
+ eventbus.register(new ExplicitByCustomAnnotationSubscriber()); | |
+ | |
+ //act | |
+ eventbus.post("testing a wierder version of my new feature which might just be jumping the shark!"); | |
+ | |
+ //assert | |
+ assertTrue(messageRecieved); | |
+ assertTrue(wasRecievedOnJavaFXThread); | |
+ } | |
+} | |
diff --git a/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java b/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java | |
index 5f984b0..48fbdcc 100644 | |
--- a/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java | |
+++ b/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java | |
@@ -18,9 +18,8 @@ package com.google.common.eventbus; | |
import com.google.common.testing.AbstractPackageSanityTests; | |
-import java.lang.reflect.Method; | |
- | |
import javax.annotation.Nullable; | |
+import java.lang.reflect.Method; | |
/** | |
* Basic sanity tests for the entire package. | |
@@ -41,7 +40,7 @@ public class PackageSanityTests extends AbstractPackageSanityTests { | |
public void handle(@Nullable Object anything) {} | |
EventSubscriber toEventSubscriber() throws Exception { | |
- return new EventSubscriber(this, subscriberMethod()); | |
+ return new EventSubscriber(this, subscriberMethod(), Synchronizer.DEFAULT_INSTANCE); | |
} | |
private static Method subscriberMethod() throws NoSuchMethodException { | |
diff --git a/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java b/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java | |
index b1c38ff..2bd34e2 100644 | |
--- a/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java | |
+++ b/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java | |
@@ -28,14 +28,14 @@ import com.google.common.collect.Multimap; | |
import com.google.common.reflect.TypeToken; | |
import com.google.common.util.concurrent.UncheckedExecutionException; | |
+import javax.annotation.Nullable; | |
+import java.lang.annotation.Annotation; | |
import java.lang.reflect.Method; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
-import javax.annotation.Nullable; | |
- | |
/** | |
* A {@link SubscriberFindingStrategy} for collecting all event subscriber methods that are marked | |
* with the {@link Subscribe} annotation. | |
@@ -147,15 +147,62 @@ class AnnotatedSubscriberFinder implements SubscriberFindingStrategy { | |
*/ | |
private static EventSubscriber makeSubscriber(Object listener, Method method) { | |
EventSubscriber wrapper; | |
+ | |
+ Synchronizer threadSynchronizer = getThreadSynchronizer(method); | |
+ | |
if (methodIsDeclaredThreadSafe(method)) { | |
- wrapper = new EventSubscriber(listener, method); | |
+ wrapper = new EventSubscriber(listener, method, threadSynchronizer); | |
} else { | |
- wrapper = new SynchronizedEventSubscriber(listener, method); | |
+ wrapper = new SynchronizedEventSubscriber(listener, method, threadSynchronizer); | |
} | |
+ | |
return wrapper; | |
} | |
- /** | |
+ private static Synchronizer getThreadSynchronizer(Method method) { | |
+ | |
+ OnThread methodAnnotation = method.getAnnotation(OnThread.class); | |
+ Annotation annotatedAnnotation = null; | |
+ | |
+ for(Annotation annotation : method.getAnnotations()){ | |
+ | |
+ OnThread annotationFromAnnotation = annotation.annotationType().getAnnotation(OnThread.class); | |
+ | |
+ if(annotationFromAnnotation != null && methodAnnotation != null){ | |
+ throw new IllegalStateException( | |
+ "found an " + OnThread.class.getSimpleName() + " annotation " + | |
+ "on both '" + method + "' and '" + annotation.annotationType() + "'!"); | |
+ } | |
+ if(annotationFromAnnotation != null && annotatedAnnotation != null){ | |
+ throw new IllegalStateException( | |
+ "found an " + OnThread.class.getSimpleName() + " annotation " + | |
+ "on both '" + annotatedAnnotation.annotationType() + "' and '" + annotation.annotationType() + "'!"); | |
+ } | |
+ if(annotationFromAnnotation != null){ | |
+ annotatedAnnotation = annotation; | |
+ } | |
+ } | |
+ | |
+ OnThread threadAnnotation = null; | |
+ if(methodAnnotation != null){ | |
+ threadAnnotation = methodAnnotation; | |
+ } | |
+ else if(annotatedAnnotation != null){ | |
+ threadAnnotation = annotatedAnnotation.annotationType().getAnnotation(OnThread.class); | |
+ } | |
+ | |
+ Synchronizer threadSynchronizer; | |
+ if(threadAnnotation != null) try{ | |
+ threadSynchronizer = threadAnnotation.value().newInstance(); | |
+ } catch (ReflectiveOperationException e) { | |
+ throw new RuntimeException(e); | |
+ } else { | |
+ threadSynchronizer = Synchronizer.DEFAULT_INSTANCE; | |
+ } | |
+ return threadSynchronizer; | |
+ } | |
+ | |
+ /** | |
* Checks whether {@code method} is thread-safe, as indicated by the | |
* {@link AllowConcurrentEvents} annotation. | |
* | |
diff --git a/guava/src/com/google/common/eventbus/EventSubscriber.java b/guava/src/com/google/common/eventbus/EventSubscriber.java | |
index 058aeab..b8f023e 100644 | |
--- a/guava/src/com/google/common/eventbus/EventSubscriber.java | |
+++ b/guava/src/com/google/common/eventbus/EventSubscriber.java | |
@@ -16,14 +16,16 @@ | |
package com.google.common.eventbus; | |
-import static com.google.common.base.Preconditions.checkNotNull; | |
- | |
import com.google.common.base.Preconditions; | |
+import com.google.common.util.concurrent.Atomics; | |
+import javax.annotation.Nullable; | |
import java.lang.reflect.InvocationTargetException; | |
import java.lang.reflect.Method; | |
+import java.util.concurrent.atomic.AtomicBoolean; | |
+import java.util.concurrent.atomic.AtomicReference; | |
-import javax.annotation.Nullable; | |
+import static com.google.common.base.Preconditions.checkNotNull; | |
/** | |
* Wraps a single-argument subscriber method on a specific object. | |
@@ -44,19 +46,23 @@ class EventSubscriber { | |
/** Subscriber method. */ | |
private final Method method; | |
+ private final Synchronizer threadSynchronizer; | |
+ | |
/** | |
* Creates a new EventSubscriber to wrap {@code method} on @{code target}. | |
* | |
* @param target object to which the method applies. | |
* @param method subscriber method. | |
*/ | |
- EventSubscriber(Object target, Method method) { | |
- Preconditions.checkNotNull(target, | |
- "EventSubscriber target cannot be null."); | |
+ EventSubscriber(Object target, Method method, Synchronizer threadSynchronizer) { | |
+ Preconditions.checkNotNull(target, "EventSubscriber target cannot be null."); | |
Preconditions.checkNotNull(method, "EventSubscriber method cannot be null."); | |
+ Preconditions.checkNotNull(threadSynchronizer, "EventSubscriber threadSynchronizer cannot be null."); | |
this.target = target; | |
this.method = method; | |
+ this.threadSynchronizer = threadSynchronizer; | |
+ | |
method.setAccessible(true); | |
} | |
@@ -68,20 +74,47 @@ class EventSubscriber { | |
* {@link Throwable} that is not an {@link Error} ({@code Error} instances are | |
* propagated as-is). | |
*/ | |
- public void handleEvent(Object event) throws InvocationTargetException { | |
+ public void handleEvent(final Object event) throws InvocationTargetException { | |
checkNotNull(event); | |
- try { | |
- method.invoke(target, new Object[] { event }); | |
- } catch (IllegalArgumentException e) { | |
- throw new Error("Method rejected target/argument: " + event, e); | |
- } catch (IllegalAccessException e) { | |
- throw new Error("Method became inaccessible: " + event, e); | |
- } catch (InvocationTargetException e) { | |
- if (e.getCause() instanceof Error) { | |
- throw (Error) e.getCause(); | |
+ | |
+ final AtomicBoolean finished = new AtomicBoolean(false); | |
+ final AtomicReference<Exception> problem = new AtomicReference<Exception>(null); | |
+ | |
+ threadSynchronizer.synchronize(new Runnable() { | |
+ @Override | |
+ public void run() { | |
+ try{ | |
+ method.invoke(target, new Object[] { event }); | |
+ } catch (Exception e){ | |
+ problem.set(e); | |
+ } finally { | |
+ finished.set(true); | |
+ } | |
+ } | |
+ }); | |
+ | |
+ if( ! finished.get()){ | |
+ throw new IllegalStateException("synchronizer '" + threadSynchronizer + "' " + | |
+ "specified on the subscribing method '" + method + "' " + | |
+ "must block the calling thread until the provided action has completed, " + | |
+ "but it did not."); | |
+ } | |
+ | |
+ Exception thrownException = problem.get(); | |
+ if(thrownException != null){ | |
+ | |
+ if(thrownException instanceof IllegalArgumentException) { | |
+ throw new Error("Method rejected target/argument: " + event, thrownException); | |
+ } else if (thrownException instanceof IllegalAccessException) { | |
+ throw new Error("Method became inaccessible: " + event, thrownException); | |
+ } else if (thrownException instanceof InvocationTargetException) { | |
+ if (thrownException.getCause() instanceof Error) { | |
+ throw (Error) thrownException.getCause(); | |
+ } | |
+ throw (InvocationTargetException) thrownException; | |
} | |
- throw e; | |
} | |
+ | |
} | |
@Override public String toString() { | |
diff --git a/guava/src/com/google/common/eventbus/OnThread.java b/guava/src/com/google/common/eventbus/OnThread.java | |
new file mode 100644 | |
index 0000000..9e59fdc | |
--- /dev/null | |
+++ b/guava/src/com/google/common/eventbus/OnThread.java | |
@@ -0,0 +1,16 @@ | |
+package com.google.common.eventbus; | |
+ | |
+import java.lang.annotation.Retention; | |
+import java.lang.annotation.Target; | |
+ | |
+import static java.lang.annotation.ElementType.*; | |
+import static java.lang.annotation.RetentionPolicy.RUNTIME; | |
+ | |
+/** | |
+ * Created by Geoff on 2014-07-30. | |
+ */ | |
+@Target({METHOD, ANNOTATION_TYPE}) | |
+@Retention(RUNTIME) | |
+public @interface OnThread { | |
+ public Class<? extends Synchronizer> value() default Synchronizer.Default.class; | |
+} | |
diff --git a/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java b/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java | |
index a074818..66fb2e6 100644 | |
--- a/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java | |
+++ b/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java | |
@@ -36,8 +36,8 @@ final class SynchronizedEventSubscriber extends EventSubscriber { | |
* @param target object to which the method applies. | |
* @param method subscriber method. | |
*/ | |
- public SynchronizedEventSubscriber(Object target, Method method) { | |
- super(target, method); | |
+ public SynchronizedEventSubscriber(Object target, Method method, Synchronizer threadSynchronizer) { | |
+ super(target, method, threadSynchronizer); | |
} | |
@Override | |
diff --git a/guava/src/com/google/common/eventbus/Synchronizer.java b/guava/src/com/google/common/eventbus/Synchronizer.java | |
new file mode 100644 | |
index 0000000..6306224 | |
--- /dev/null | |
+++ b/guava/src/com/google/common/eventbus/Synchronizer.java | |
@@ -0,0 +1,18 @@ | |
+package com.google.common.eventbus; | |
+ | |
+/** | |
+* Created by Geoff on 2014-07-30. | |
+*/ | |
+public interface Synchronizer { | |
+ | |
+ public void synchronize(Runnable actionToPerformSynchronously); | |
+ | |
+ | |
+ public static class Default implements Synchronizer{ | |
+ public void synchronize(Runnable actionToPerformSynchronously) { | |
+ actionToPerformSynchronously.run(); | |
+ } | |
+ } | |
+ | |
+ public static Default DEFAULT_INSTANCE = new Default(); | |
+} | |
diff --git a/guava/src/com/google/common/reflect/Types.java b/guava/src/com/google/common/reflect/Types.java | |
index d6c56e8..a1dabfc 100644 | |
--- a/guava/src/com/google/common/reflect/Types.java | |
+++ b/guava/src/com/google/common/reflect/Types.java | |
@@ -29,7 +29,9 @@ import com.google.common.collect.ImmutableList; | |
import com.google.common.collect.Iterables; | |
import java.io.Serializable; | |
+import java.lang.annotation.Annotation; | |
import java.lang.reflect.AnnotatedElement; | |
+import java.lang.reflect.AnnotatedType; | |
import java.lang.reflect.Array; | |
import java.lang.reflect.GenericArrayType; | |
import java.lang.reflect.GenericDeclaration; | |
@@ -343,7 +345,12 @@ final class Types { | |
return name; | |
} | |
- @Override public String toString() { | |
+ @Override | |
+ public AnnotatedType[] getAnnotatedBounds() { | |
+ throw new UnsupportedOperationException("getAnnotatedBounds"); | |
+ } | |
+ | |
+ @Override public String toString() { | |
return name; | |
} | |
@@ -371,6 +378,21 @@ final class Types { | |
return false; | |
} | |
} | |
+ | |
+ @Override | |
+ public <T extends Annotation> T getAnnotation(Class<T> annotationClass) { | |
+ throw new UnsupportedOperationException("getAnnotation"); | |
+ } | |
+ | |
+ @Override | |
+ public Annotation[] getAnnotations() { | |
+ throw new UnsupportedOperationException("getAnnotations"); | |
+ } | |
+ | |
+ @Override | |
+ public Annotation[] getDeclaredAnnotations() { | |
+ throw new UnsupportedOperationException("getDeclaredAnnotations"); | |
+ } | |
} | |
static final class WildcardTypeImpl implements WildcardType, Serializable { | |
diff --git a/groostav-guava-plantation.diff b/groostav-guava-plantation.diff | |
new file mode 100644 | |
index 0000000..a0cbaa0 | |
Binary files /dev/null and b/groostav-guava-plantation.diff differ | |
diff --git a/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java b/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java | |
index d324a38..48dfb78 100644 | |
--- a/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java | |
+++ b/guava-testlib/src/com/google/common/testing/AbstractPackageSanityTests.java | |
@@ -16,10 +16,6 @@ | |
package com.google.common.testing; | |
-import static com.google.common.base.Predicates.and; | |
-import static com.google.common.base.Predicates.not; | |
-import static com.google.common.testing.AbstractPackageSanityTests.Chopper.suffix; | |
- | |
import com.google.common.annotations.Beta; | |
import com.google.common.annotations.VisibleForTesting; | |
import com.google.common.base.Optional; | |
@@ -33,10 +29,8 @@ import com.google.common.collect.Multimap; | |
import com.google.common.collect.Sets; | |
import com.google.common.reflect.ClassPath; | |
import com.google.common.testing.NullPointerTester.Visibility; | |
- | |
import junit.framework.AssertionFailedError; | |
import junit.framework.TestCase; | |
- | |
import org.junit.Test; | |
import java.io.IOException; | |
@@ -47,6 +41,10 @@ import java.util.TreeMap; | |
import java.util.logging.Level; | |
import java.util.logging.Logger; | |
+import static com.google.common.base.Predicates.and; | |
+import static com.google.common.base.Predicates.not; | |
+import static com.google.common.testing.AbstractPackageSanityTests.Chopper.suffix; | |
+ | |
/** | |
* Automatically runs sanity checks against top level classes in the same package of the test that | |
* extends {@code AbstractPackageSanityTests}. Currently sanity checks include {@link | |
diff --git a/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java b/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java | |
index b0a332a..682e474 100644 | |
--- a/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java | |
+++ b/guava-tests/test/com/google/common/eventbus/EventSubscriberTest.java | |
@@ -50,7 +50,7 @@ public class EventSubscriberTest extends TestCase { | |
public void testBasicMethodCall() throws Exception { | |
Method method = getRecordingMethod(); | |
- EventSubscriber subscriber = new EventSubscriber(this, method); | |
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE); | |
subscriber.handleEvent(FIXTURE_ARGUMENT); | |
@@ -61,7 +61,7 @@ public class EventSubscriberTest extends TestCase { | |
public void testExceptionWrapping() { | |
Method method = getExceptionThrowingMethod(); | |
- EventSubscriber subscriber = new EventSubscriber(this, method); | |
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE); | |
try { | |
subscriber.handleEvent(new Object()); | |
@@ -74,7 +74,7 @@ public class EventSubscriberTest extends TestCase { | |
public void testErrorPassthrough() throws InvocationTargetException { | |
Method method = getErrorThrowingMethod(); | |
- EventSubscriber subscriber = new EventSubscriber(this, method); | |
+ EventSubscriber subscriber = new EventSubscriber(this, method, Synchronizer.DEFAULT_INSTANCE); | |
try { | |
subscriber.handleEvent(new Object()); | |
@@ -89,13 +89,17 @@ public class EventSubscriberTest extends TestCase { | |
Method concat = String.class.getMethod("concat", String.class); | |
new EqualsTester() | |
.addEqualityGroup( | |
- new EventSubscriber("foo", charAt), new EventSubscriber("foo", charAt)) | |
- .addEqualityGroup(new EventSubscriber("bar", charAt)) | |
- .addEqualityGroup(new EventSubscriber("foo", concat)) | |
+ makeSubscriber("foo", charAt), makeSubscriber("foo", charAt)) | |
+ .addEqualityGroup(makeSubscriber("bar", charAt)) | |
+ .addEqualityGroup(makeSubscriber("foo", concat)) | |
.testEquals(); | |
} | |
- /** | |
+ private EventSubscriber makeSubscriber(String target, Method method) { | |
+ return new EventSubscriber(target, method, Synchronizer.DEFAULT_INSTANCE); | |
+ } | |
+ | |
+ /** | |
* Gets a reference to {@link #recordingMethod(Object)}. | |
* | |
* @return a Method wrapping {@link #recordingMethod(Object)}. | |
diff --git a/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java b/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java | |
new file mode 100644 | |
index 0000000..24ba23e | |
--- /dev/null | |
+++ b/guava-tests/test/com/google/common/eventbus/ExplicitThreadSupplierTest.java | |
@@ -0,0 +1,119 @@ | |
+package com.google.common.eventbus; | |
+ | |
+import com.sun.javafx.application.PlatformImpl; | |
+import javafx.application.Platform; | |
+import junit.framework.TestCase; | |
+ | |
+import java.lang.annotation.Retention; | |
+import java.lang.annotation.RetentionPolicy; | |
+import java.util.concurrent.CountDownLatch; | |
+ | |
+/** | |
+ * Created by Geoff on 2014-07-30. | |
+ */ | |
+public class ExplicitThreadSupplierTest extends TestCase { | |
+ | |
+ private boolean messageRecieved = false; | |
+ private Thread threadUsed = null; | |
+ private boolean wasRecievedOnJavaFXThread = false; | |
+ | |
+ public static class FXThreadSynchronizer implements Synchronizer { | |
+ | |
+ public static final Runnable DoNothing = new Runnable() { public void run() {} }; | |
+ | |
+ static { | |
+ PlatformImpl.startup(DoNothing); | |
+ } | |
+ | |
+ @Override | |
+ public void synchronize(final Runnable actionToPerformSynchronously) { | |
+ | |
+ final CountDownLatch latch = new CountDownLatch(1); | |
+ | |
+ Platform.runLater(new Runnable() { | |
+ @Override | |
+ public void run() { | |
+ actionToPerformSynchronously.run(); | |
+ latch.countDown(); | |
+ } | |
+ }); | |
+ | |
+ try { | |
+ latch.await(); | |
+ } catch (InterruptedException e) { | |
+ throw new RuntimeException(e); | |
+ } | |
+ } | |
+ } | |
+ | |
+ public class FXSubscriber { | |
+ @Subscribe | |
+ @OnThread(FXThreadSynchronizer.class) | |
+ public void handle(String message) { | |
+ wasRecievedOnJavaFXThread = Platform.isFxApplicationThread(); | |
+ messageRecieved = true; | |
+ } | |
+ } | |
+ | |
+ public void testExplicitHandlingThread() { | |
+ //setup | |
+ EventBus eventBus = new EventBus(); | |
+ eventBus.register(new FXSubscriber()); | |
+ | |
+ //act | |
+ eventBus.post("testing my cool new feature!"); | |
+ | |
+ //assert | |
+ assertTrue(messageRecieved); | |
+ assertTrue(wasRecievedOnJavaFXThread); | |
+ } | |
+ | |
+ | |
+ public class NoExplicitThreadSubscriber { | |
+ @Subscribe | |
+ public void handle(String message) { | |
+ threadUsed = Thread.currentThread(); | |
+ messageRecieved = true; | |
+ } | |
+ } | |
+ | |
+ public void testImplicitHandlingThread() { | |
+ //setup | |
+ EventBus eventBus = new EventBus(); | |
+ eventBus.register(new NoExplicitThreadSubscriber()); | |
+ | |
+ //act | |
+ eventBus.post("testing the old features!"); | |
+ | |
+ //assert | |
+ assertTrue(messageRecieved); | |
+ assertEquals(threadUsed, Thread.currentThread()); | |
+ } | |
+ | |
+ @Retention(RetentionPolicy.RUNTIME) | |
+ @OnThread(FXThreadSynchronizer.class) | |
+ public static @interface OnJavaFXThread { | |
+ } | |
+ | |
+ public class ExplicitByCustomAnnotationSubscriber { | |
+ @Subscribe | |
+ @OnJavaFXThread | |
+ public void handle(String message) { | |
+ messageRecieved = true; | |
+ wasRecievedOnJavaFXThread = Platform.isFxApplicationThread(); | |
+ } | |
+ } | |
+ | |
+ public void testThreadImpliedByAnnotatedAnnotation() { | |
+ //setup | |
+ EventBus eventbus = new EventBus(); | |
+ eventbus.register(new ExplicitByCustomAnnotationSubscriber()); | |
+ | |
+ //act | |
+ eventbus.post("testing a wierder version of my new feature which might just be jumping the shark!"); | |
+ | |
+ //assert | |
+ assertTrue(messageRecieved); | |
+ assertTrue(wasRecievedOnJavaFXThread); | |
+ } | |
+} | |
diff --git a/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java b/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java | |
index 5f984b0..48fbdcc 100644 | |
--- a/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java | |
+++ b/guava-tests/test/com/google/common/eventbus/PackageSanityTests.java | |
@@ -18,9 +18,8 @@ package com.google.common.eventbus; | |
import com.google.common.testing.AbstractPackageSanityTests; | |
-import java.lang.reflect.Method; | |
- | |
import javax.annotation.Nullable; | |
+import java.lang.reflect.Method; | |
/** | |
* Basic sanity tests for the entire package. | |
@@ -41,7 +40,7 @@ public class PackageSanityTests extends AbstractPackageSanityTests { | |
public void handle(@Nullable Object anything) {} | |
EventSubscriber toEventSubscriber() throws Exception { | |
- return new EventSubscriber(this, subscriberMethod()); | |
+ return new EventSubscriber(this, subscriberMethod(), Synchronizer.DEFAULT_INSTANCE); | |
} | |
private static Method subscriberMethod() throws NoSuchMethodException { | |
diff --git a/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java b/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java | |
index b1c38ff..aa2fbb9 100644 | |
--- a/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java | |
+++ b/guava/src/com/google/common/eventbus/AnnotatedSubscriberFinder.java | |
@@ -28,14 +28,14 @@ import com.google.common.collect.Multimap; | |
import com.google.common.reflect.TypeToken; | |
import com.google.common.util.concurrent.UncheckedExecutionException; | |
+import javax.annotation.Nullable; | |
+import java.lang.annotation.Annotation; | |
import java.lang.reflect.Method; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
-import javax.annotation.Nullable; | |
- | |
/** | |
* A {@link SubscriberFindingStrategy} for collecting all event subscriber methods that are marked | |
* with the {@link Subscribe} annotation. | |
@@ -147,15 +147,62 @@ class AnnotatedSubscriberFinder implements SubscriberFindingStrategy { | |
*/ | |
private static EventSubscriber makeSubscriber(Object listener, Method method) { | |
EventSubscriber wrapper; | |
+ | |
+ Synchronizer threadSynchronizer = getThreadSynchronizer(method); | |
+ | |
if (methodIsDeclaredThreadSafe(method)) { | |
- wrapper = new EventSubscriber(listener, method); | |
+ wrapper = new EventSubscriber(listener, method, threadSynchronizer); | |
} else { | |
- wrapper = new SynchronizedEventSubscriber(listener, method); | |
+ wrapper = new SynchronizedEventSubscriber(listener, method, threadSynchronizer); | |
} | |
+ | |
return wrapper; | |
} | |
- /** | |
+ private static Synchronizer getThreadSynchronizer(Method method) { | |
+ | |
+ OnThread methodAnnotation = method.getAnnotation(OnThread.class); | |
+ Annotation annotatedAnnotation = null; | |
+ | |
+ for(Annotation annotation : method.getAnnotations()){ | |
+ | |
+ OnThread annotationFromAnnotation = annotation.annotationType().getAnnotation(OnThread.class); | |
+ | |
+ if(annotationFromAnnotation != null && methodAnnotation != null){ | |
+ throw new IllegalStateException( | |
+ "found an " + OnThread.class.getSimpleName() + " annotation " + | |
+ "on both '" + method + "' and '" + annotation.annotationType() + "'!"); | |
+ } | |
+ if(annotationFromAnnotation != null && annotatedAnnotation != null){ | |
+ throw new IllegalStateException( | |
+ "found an " + OnThread.class.getSimpleName() + " annotation " + | |
+ "on both '" + annotatedAnnotation.annotationType() + "' and '" + annotation.annotationType() + "'!"); | |
+ } | |
+ if(annotationFromAnnotation != null){ | |
+ annotatedAnnotation = annotation; | |
+ } | |
+ } | |
+ | |
+ OnThread threadAnnotation = null; | |
+ if(methodAnnotation != null){ | |
+ threadAnnotation = methodAnnotation; | |
+ } | |
+ else if(annotatedAnnotation != null){ | |
+ threadAnnotation = annotatedAnnotation.annotationType().getAnnotation(OnThread.class); | |
+ } | |
+ | |
+ Synchronizer threadSynchronizer; | |
+ if(threadAnnotation != null) try{ | |
+ threadSynchronizer = threadAnnotation.value().newInstance(); | |
+ } catch (ReflectiveOperationException e) { | |
+ throw new RuntimeException(e); | |
+ } else { | |
+ threadSynchronizer = Synchronizer.DEFAULT_INSTANCE; | |
+ } | |
+ return threadSynchronizer; | |
+ } | |
+ | |
+ /** | |
* Checks whether {@code method} is thread-safe, as indicated by the | |
* {@link AllowConcurrentEvents} annotation. | |
* | |
diff --git a/guava/src/com/google/common/eventbus/EventSubscriber.java b/guava/src/com/google/common/eventbus/EventSubscriber.java | |
index 058aeab..3f2cb4f 100644 | |
--- a/guava/src/com/google/common/eventbus/EventSubscriber.java | |
+++ b/guava/src/com/google/common/eventbus/EventSubscriber.java | |
@@ -16,14 +16,16 @@ | |
package com.google.common.eventbus; | |
-import static com.google.common.base.Preconditions.checkNotNull; | |
- | |
import com.google.common.base.Preconditions; | |
+import com.google.common.util.concurrent.Atomics; | |
+import javax.annotation.Nullable; | |
import java.lang.reflect.InvocationTargetException; | |
import java.lang.reflect.Method; | |
+import java.util.concurrent.atomic.AtomicBoolean; | |
+import java.util.concurrent.atomic.AtomicReference; | |
-import javax.annotation.Nullable; | |
+import static com.google.common.base.Preconditions.checkNotNull; | |
/** | |
* Wraps a single-argument subscriber method on a specific object. | |
@@ -44,19 +46,23 @@ class EventSubscriber { | |
/** Subscriber method. */ | |
private final Method method; | |
+ private final Synchronizer threadSynchronizer; | |
+ | |
/** | |
* Creates a new EventSubscriber to wrap {@code method} on @{code target}. | |
* | |
* @param target object to which the method applies. | |
* @param method subscriber method. | |
*/ | |
- EventSubscriber(Object target, Method method) { | |
- Preconditions.checkNotNull(target, | |
- "EventSubscriber target cannot be null."); | |
+ EventSubscriber(Object target, Method method, Synchronizer threadSynchronizer) { | |
+ Preconditions.checkNotNull(target, "EventSubscriber target cannot be null."); | |
Preconditions.checkNotNull(method, "EventSubscriber method cannot be null."); | |
+ Preconditions.checkNotNull(threadSynchronizer, "EventSubscriber threadSynchronizer cannot be null."); | |
this.target = target; | |
this.method = method; | |
+ this.threadSynchronizer = threadSynchronizer; | |
+ | |
method.setAccessible(true); | |
} | |
@@ -68,20 +74,47 @@ class EventSubscriber { | |
* {@link Throwable} that is not an {@link Error} ({@code Error} instances are | |
* propagated as-is). | |
*/ | |
- public void handleEvent(Object event) throws InvocationTargetException { | |
+ public void handleEvent(final Object event) throws InvocationTargetException { | |
checkNotNull(event); | |
- try { | |
- method.invoke(target, new Object[] { event }); | |
- } catch (IllegalArgumentException e) { | |
- throw new Error("Method rejected target/argument: " + event, e); | |
- } catch (IllegalAccessException e) { | |
- throw new Error("Method became inaccessible: " + event, e); | |
- } catch (InvocationTargetException e) { | |
- if (e.getCause() instanceof Error) { | |
- throw (Error) e.getCause(); | |
+ | |
+ final AtomicBoolean finished = new AtomicBoolean(false); | |
+ final AtomicReference<Exception> problem = new AtomicReference<Exception>(null); | |
+ | |
+ threadSynchronizer.synchronize(new Runnable() { | |
+ @Override | |
+ public void run() { | |
+ try{ | |
+ method.invoke(target, new Object[] { event }); | |
+ } catch (Exception e){ | |
+ problem.set(e); | |
+ } finally { | |
+ finished.set(true); | |
+ } | |
} | |
- throw e; | |
+ }); | |
+ | |
+ if( ! finished.get()){ | |
+ throw new IllegalStateException("synchronizer '" + threadSynchronizer + "' " + | |
+ "specified on the subscribing method '" + method + "' " + | |
+ "must block the calling thread until the provided action has completed, " + | |
+ "but it did not."); | |
} | |
+ | |
+ Exception thrownException = problem.get(); | |
+ if(thrownException != null){ | |
+ | |
+ if(thrownException instanceof IllegalArgumentException) { | |
+ throw new Error("Method rejected target/argument: " + event, thrownException); | |
+ } else if (thrownException instanceof IllegalAccessException) { | |
+ throw new Error("Method became inaccessible: " + event, thrownException); | |
+ } else if (thrownException instanceof InvocationTargetException) { | |
+ if (thrownException.getCause() instanceof Error) { | |
+ throw (Error) thrownException.getCause(); | |
+ } | |
+ throw (InvocationTargetException) thrownException; | |
+ } | |
+ } | |
+ | |
} | |
@Override public String toString() { | |
diff --git a/guava/src/com/google/common/eventbus/OnThread.java b/guava/src/com/google/common/eventbus/OnThread.java | |
new file mode 100644 | |
index 0000000..86f0a45 | |
--- /dev/null | |
+++ b/guava/src/com/google/common/eventbus/OnThread.java | |
@@ -0,0 +1,51 @@ | |
+package com.google.common.eventbus; | |
+ | |
+import java.lang.annotation.Retention; | |
+import java.lang.annotation.Target; | |
+ | |
+import static java.lang.annotation.ElementType.*; | |
+import static java.lang.annotation.RetentionPolicy.RUNTIME; | |
+ | |
+/** | |
+ * Annotation to allow execution of handlers on threads other than the one the poster is on. | |
+ * | |
+ * <P>Subscribers that are annotation with this annotation in addition to the {@link Subscribe} | |
+ * annotation will have be executed with the specified {@link com.google.common.eventbus.Synchronizer}.</P> | |
+ * | |
+ * <P>For example, consider an event handler that will modify Swing view code. To respect | |
+ * UI threading, any code that modifies view fields must be wrapped in a call to | |
+ * {@link javax.swing.SwingUtilities#invokeAndWait(Runnable)} or | |
+ * {@link javax.swing.SwingUtilities#invokeLater(Runnable)}. This requires a consistent amount | |
+ * of confusing boiler plate code, which is an example of the problems this annotation was created to | |
+ * address.</P> | |
+ * | |
+ * <P>consider the Synchronizer | |
+ * <pre>{@code | |
+ * class SwingThreadSynchronizer implements Synchronizer{ | |
+ * @literal@Override | |
+ * public void synchronize(Runnable runnable){ | |
+ * SwingUtilities.invokeAndWait(runnable); | |
+ * } | |
+ * } | |
+ * }</pre> | |
+ * with this class, to solve the above problem a controller that modifies its view | |
+ * should add this annotation to its subscribers: | |
+ * <pre>{@code | |
+ * @literal@Subscribe | |
+ * @literal@OnThread(SwingThreadSynchronizer.class) | |
+ * public void onViewRelaventModelChange(YourModelEvent event){ | |
+ * view.information.setText(derriveTextFromModel()); | |
+ * //... other code that would cause concurrency issues if not run on an Swing thread. | |
+ * } | |
+ * }</pre> | |
+ * </P> | |
+ * | |
+ * @see com.google.common.eventbus.Synchronizer | |
+ * | |
+ * Created by Geoff Groos ([email protected]) on 2014-07-30. | |
+ */ | |
+@Target({METHOD, ANNOTATION_TYPE}) | |
+@Retention(RUNTIME) | |
+public @interface OnThread { | |
+ public Class<? extends Synchronizer> value() default Synchronizer.Default.class; | |
+} | |
diff --git a/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java b/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java | |
index a074818..66fb2e6 100644 | |
--- a/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java | |
+++ b/guava/src/com/google/common/eventbus/SynchronizedEventSubscriber.java | |
@@ -36,8 +36,8 @@ final class SynchronizedEventSubscriber extends EventSubscriber { | |
* @param target object to which the method applies. | |
* @param method subscriber method. | |
*/ | |
- public SynchronizedEventSubscriber(Object target, Method method) { | |
- super(target, method); | |
+ public SynchronizedEventSubscriber(Object target, Method method, Synchronizer threadSynchronizer) { | |
+ super(target, method, threadSynchronizer); | |
} | |
@Override | |
diff --git a/guava/src/com/google/common/eventbus/Synchronizer.java b/guava/src/com/google/common/eventbus/Synchronizer.java | |
new file mode 100644 | |
index 0000000..e3f1919 | |
--- /dev/null | |
+++ b/guava/src/com/google/common/eventbus/Synchronizer.java | |
@@ -0,0 +1,16 @@ | |
+package com.google.common.eventbus; | |
+ | |
+/** | |
+ * Created by Geoff Groos ([email protected]) on 2014-07-30. | |
+ */ | |
+public interface Synchronizer { | |
+ | |
+ public void synchronize(Runnable actionToPerformSynchronously); | |
+ | |
+ public static Default DEFAULT_INSTANCE = new Default(); | |
+ public static class Default implements Synchronizer { | |
+ public void synchronize(Runnable actionToPerformSynchronously) { | |
+ actionToPerformSynchronously.run(); | |
+ } | |
+ } | |
+} | |
diff --git a/guava/src/com/google/common/reflect/Types.java b/guava/src/com/google/common/reflect/Types.java | |
index d6c56e8..cdd4ffb 100644 | |
--- a/guava/src/com/google/common/reflect/Types.java | |
+++ b/guava/src/com/google/common/reflect/Types.java | |
@@ -29,7 +29,9 @@ import com.google.common.collect.ImmutableList; | |
import com.google.common.collect.Iterables; | |
import java.io.Serializable; | |
+import java.lang.annotation.Annotation; | |
import java.lang.reflect.AnnotatedElement; | |
+import java.lang.reflect.AnnotatedType; | |
import java.lang.reflect.Array; | |
import java.lang.reflect.GenericArrayType; | |
import java.lang.reflect.GenericDeclaration; | |
@@ -343,9 +345,12 @@ final class Types { | |
return name; | |
} | |
+ @Override | |
+ public AnnotatedType[] getAnnotatedBounds() { throw new UnsupportedOperationException("getAnnotatedBounds");} | |
+ | |
@Override public String toString() { | |
return name; | |
- } | |
+} | |
@Override public int hashCode() { | |
return genericDeclaration.hashCode() ^ name.hashCode(); | |
@@ -371,6 +376,21 @@ final class Types { | |
return false; | |
} | |
} | |
+ | |
+ @Override | |
+ public <T extends Annotation> T getAnnotation(Class<T> annotationClass) { | |
+ throw new UnsupportedOperationException("getAnnotation"); | |
+ } | |
+ | |
+ @Override | |
+ public Annotation[] getAnnotations() { | |
+ throw new UnsupportedOperationException("getAnnotations"); | |
+ } | |
+ | |
+ @Override | |
+ public Annotation[] getDeclaredAnnotations() { | |
+ throw new UnsupportedOperationException("getDeclaredAnnotations"); | |
+ } | |
} | |
static final class WildcardTypeImpl implements WildcardType, Serializable { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment