Skip to content

Instantly share code, notes, and snippets.

@wreulicke
Last active September 26, 2017 16:37
Show Gist options
  • Save wreulicke/a24540fdd4c56c7dd5b1278c6392fcbc to your computer and use it in GitHub Desktop.
Save wreulicke/a24540fdd4c56c7dd5b1278c6392fcbc to your computer and use it in GitHub Desktop.
疲れたよパトラッシュ・・・
@Slf4j
public class ExecutionContextExecutorServiceTest {
ExecutorService service = new ExecutionContextExecutorService(Executors.newFixedThreadPool(10));
@Test
public void test() throws Exception {
try (MDCCloseable closeable = MDC.putCloseable("test", "xyz")) {
assertThat(MDC.get("test")).isEqualTo("xyz");
Future<String> future1 = service.submit(() -> {
assertThat(service.submit(() -> MDC.get("test")).get()).isEqualTo("xyz");
MDC.put("test", "zzz");
assertThat(MDC.get("test")).isEqualTo("zzz");
assertThat(service.submit(() -> MDC.get("test")).get()).isEqualTo("zzz");
return MDC.get("test");
});
Future<String> future2 = service.submit(() -> MDC.get("test"));
Future<String> future3 = service.submit(() -> MDC.get("test"));
Future<String> future4 = service.submit(() -> MDC.get("test"));
assertThat(future1.get()).isEqualTo("zzz");
assertThat(future2.get()).isEqualTo("xyz");
assertThat(future3.get()).isEqualTo("xyz");
assertThat(future4.get()).isEqualTo("xyz");
}
assertThat(MDC.get("test")).isNull();
}
@Test
public void testWithCompletableFuture() throws Exception {
try (MDCCloseable closeable = MDC.putCloseable("test", "xyz")) {
CompletableFuture<String> future1 =
CompletableFuture.supplyAsync(() -> MDC.get("test"), service)
.thenComposeAsync(xyz -> {
log.info("a");
assertThat(xyz).isEqualTo("xyz");
return CompletableFuture.supplyAsync(() -> MDC.get("test"), service);
}, service)
.thenAccept(xyz -> {
log.info("b");
assertThat(xyz).isEqualTo("xyz");
MDC.put("test", "zzz");
assertThat(MDC.get("test")).isEqualTo("zzz");
})
.thenApply(ignore -> MDC.get("test"));
;
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> MDC.get("test"), service);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> MDC.get("test"), service);
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> MDC.get("test"), service);
assertThat(future1.get()).isEqualTo("zzz");
assertThat(future2.get()).isEqualTo("xyz");
assertThat(future3.get()).isEqualTo("xyz");
assertThat(future4.get()).isEqualTo("xyz");
}
assertThat(MDC.get("test")).isNull();
}
}
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.slf4j.MDC.MDCCloseable;
import org.junit.Test;
@Slf4j
public class MDCLoggingTest {
ExecutorService service = new MDCContextInheritableExecutorService(Executors.newFixedThreadPool(10));
@Test
public void test() {
try (MDCCloseable closeable = MDC.putCloseable("test", "xyz")) {
log.info(MDC.get("test")); // xyz
try (MDCContext context = MDCContextInheritableExecutorService.setMDCContext(new MDCContext())) {
service.execute(() -> {
log.info(MDC.get("test")); // xyz
service.execute(() -> {
log.info(MDC.get("test")); // xyz
});
MDC.put("test", "zzz");
log.info(MDC.get("test")); // zzz
service.execute(() -> {
log.info(MDC.get("test")); // xyz !!
});
});
service.execute(() -> {
log.info(MDC.get("test")); // xyz
});
service.execute(() -> {
log.info(MDC.get("test")); // xyz
});
service.execute(() -> {
log.info(MDC.get("test")); // xyz
});
} finally {
log.info(MDC.get("test")); // null
}
log.info(MDC.get("test")); // null
}
}
@RequiredArgsConstructor
static class MDCContextInheritableExecutorService implements ExecutorService {
private final ExecutorService service;
public static ThreadLocal<MDCContext> context = new ThreadLocal<>();
public static MDCContext getMDCContext() {
return context.get();
}
public static MDCContext setMDCContext(MDCContext mdcContext) {
context.set(mdcContext);
return mdcContext;
}
public static void clearContext() {
context.remove();
}
@Override
public void execute(Runnable command) {
service.execute(wrap(command));
}
@Override
public void shutdown() {
service.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return service.shutdownNow();
}
@Override
public boolean isShutdown() {
return service.isShutdown();
}
@Override
public boolean isTerminated() {
return service.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return service.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return service.submit(wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return service.submit(wrap(task), result);
}
@Override
public Future<?> submit(Runnable task) {
return service.submit(wrap(task));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
int size = tasks.size();
Collection<? extends Callable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toCollection(() -> new ArrayList<>(size)));
return service.invokeAll(wrappedTasks);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
int size = tasks.size();
Collection<? extends Callable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toCollection(() -> new ArrayList<>(size)));
return service.invokeAll(wrappedTasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
int size = tasks.size();
Collection<? extends Callable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toCollection(() -> new ArrayList<>(size)));
return service.invokeAny(wrappedTasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
int size = tasks.size();
Collection<? extends Callable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toCollection(() -> new ArrayList<>(size)));
return service.invokeAny(wrappedTasks, timeout, unit);
}
private Runnable wrap(Runnable runnable) {
MDCContext context = this.context.get();
if (context == null) {
return runnable;
} else {
return context.with(runnable);
}
}
private <T> Callable<T> wrap(Callable<T> callable) {
MDCContext context = this.context.get();
if (context == null) {
return callable;
} else {
return context.with(callable);
}
}
}
static class MDCContext implements AutoCloseable {
Map<String, String> context = MDC.getCopyOfContextMap();
public Runnable with(Runnable runnable) {
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
MDCContext previousContext = MDCContextInheritableExecutorService.context.get();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
MDCContextInheritableExecutorService.setMDCContext(this);
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
if (previousContext == null) {
MDCContextInheritableExecutorService.clearContext();
} else {
MDCContextInheritableExecutorService.setMDCContext(previousContext);
}
}
};
}
public <T> Callable<T> with(Callable<T> callable) {
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
MDCContext previousContext = MDCContextInheritableExecutorService.context.get();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
MDCContextInheritableExecutorService.setMDCContext(this);
try {
return callable.call();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
if (previousContext == null) {
MDCContextInheritableExecutorService.clearContext();
} else {
MDCContextInheritableExecutorService.setMDCContext(previousContext);
}
}
};
}
@Override
public void close() {
MDCContextInheritableExecutorService.clearContext();
MDC.clear();
}
}
}
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.slf4j.MDC.MDCCloseable;
import org.junit.Test;
@Slf4j
public class MDCLoggingTest {
ExecutorService service = new ProcessContextInheritableExecutorService(Executors.newFixedThreadPool(10));
@Test
public void test() {
try (MDCCloseable closeable = MDC.putCloseable("test", "xyz")) {
log.info(MDC.get("test")); // xyz
service.execute(() -> {
log.info(MDC.get("test")); // xyz
service.execute(() -> {
log.info(MDC.get("test")); // xyz
});
MDC.put("test", "zzz");
log.info(MDC.get("test")); // zzz
service.execute(() -> {
log.info(MDC.get("test")); // zzz
});
});
service.execute(() -> {
log.info(MDC.get("test")); // xyz
});
service.execute(() -> {
log.info(MDC.get("test")); // xyz
});
service.execute(() -> {
log.info(MDC.get("test")); // xyz
});
}
log.info(MDC.get("test")); // null
}
@RequiredArgsConstructor
static class ProcessContextInheritableExecutorService implements ExecutorService {
private final ExecutorService service;
@Override
public void execute(Runnable command) {
service.execute(wrap(command));
}
@Override
public void shutdown() {
service.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return service.shutdownNow();
}
@Override
public boolean isShutdown() {
return service.isShutdown();
}
@Override
public boolean isTerminated() {
return service.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return service.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return service.submit(wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return service.submit(wrap(task), result);
}
@Override
public Future<?> submit(Runnable task) {
return service.submit(wrap(task));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
int size = tasks.size();
Collection<? extends Callable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toCollection(() -> new ArrayList<>(size)));
return service.invokeAll(wrappedTasks);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
int size = tasks.size();
Collection<? extends Callable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toCollection(() -> new ArrayList<>(size)));
return service.invokeAll(wrappedTasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
int size = tasks.size();
Collection<? extends Callable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toCollection(() -> new ArrayList<>(size)));
return service.invokeAny(wrappedTasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
int size = tasks.size();
Collection<? extends Callable<T>> wrappedTasks = tasks.stream()
.map(this::wrap)
.collect(Collectors.toCollection(() -> new ArrayList<>(size)));
return service.invokeAny(wrappedTasks, timeout, unit);
}
private Runnable wrap(Runnable runnable) {
return new ProcessContext().with(runnable);
}
private <T> Callable<T> wrap(Callable<T> callable) {
return new ProcessContext().with(callable);
}
}
static class ProcessContext {
Map<String, String> context = MDC.getCopyOfContextMap();
public Runnable with(Runnable runnable) {
return () -> {
Map<String, String> previousContextMap = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previousContextMap == null) {
MDC.clear();
} else {
MDC.setContextMap(previousContextMap);
}
}
};
}
public <T> Callable<T> with(Callable<T> callable) {
return () -> {
Map<String, String> previousContextMap = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
return callable.call();
} finally {
if (previousContextMap == null) {
MDC.clear();
} else {
MDC.setContextMap(previousContextMap);
}
}
};
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment