Skip to content

Instantly share code, notes, and snippets.

@bxb100
Last active April 16, 2022 14:20
Show Gist options
  • Save bxb100/fe1de0787b47ffe7ca364439b81b517b to your computer and use it in GitHub Desktop.
Save bxb100/fe1de0787b47ffe7ca364439b81b517b to your computer and use it in GitHub Desktop.
see guarded object gist.
package com.pers.cook.thread;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Guarded main demo.
*
* @author Xiaobo Bi ([email protected])
*/
public class GuardedMain {
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
int id = ATOMIC_INTEGER.getAndIncrement();
// 创建一信息
Message msg1 = new Message(id, "{}" + id);
GuardedObject<Message> go = GuardedObject.create(id);
// 发送消息, 这里就是开启线程
sendMessage(msg1);
// 等待 MQ 信息
Message message = go.get(Objects::nonNull);
System.out.println(message);
}).start();
}
}
static void sendMessage(Message message) {
CompletableFuture.runAsync(() -> {
Message returnMessage = new Message(message.id, "null");
try {
// 模拟处理耗时
int abs = (int) ((Math.random() + 1) * 100);
System.out.println(message.id + "休眠" + abs + "s");
Thread.sleep(abs);
returnMessage = new Message(message.id, "返回" + message.msg);
} catch (Exception e) {
returnMessage = new Message(message.id, "异常" + e);
} finally {
GuardedMain.onMessage(returnMessage);
}
});
}
static void onMessage(Message message) {
GuardedObject.fireEvent(message.id, message);
}
static class Message {
private int id;
private String msg;
Message(int id, String msg) {
this.id = id;
this.msg = msg;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", msg='" + msg + '\'' +
'}';
}
}
}
package com.pers.cook.thread;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
/**
* Guarded object demo. 异步转同步的方法.
*
* @author Xiaobo Bi ([email protected])
*/
class GuardedObject<T> {
private final static Map<Object, GuardedObject> GOS = new ConcurrentHashMap<>();
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
/**
* 受保护的对象
*/
private T obj;
static <K, T> GuardedObject<T> create(K key) {
GuardedObject<T> object = new GuardedObject<>();
GOS.put(key, object);
return object;
}
@SuppressWarnings("unchecked")
static <K, T> void fireEvent(K key, T obj) {
GuardedObject go = GOS.remove(key);
if (go != null) {
go.onChange(obj);
}
}
/**
* 获取受保护的对象
*
* @param predicate 验证 lambda 有效性
* @return pojo
*/
T get(Predicate<T> predicate) {
lock.lock();
try {
// MESA 管程推荐写法
while (!predicate.test(obj)) {
done.await(2, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return obj;
}
/**
* 事件通知
*
* @param obj pojo
*/
private void onChange(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment