Last active
April 16, 2022 14:20
-
-
Save bxb100/fe1de0787b47ffe7ca364439b81b517b to your computer and use it in GitHub Desktop.
see guarded object gist.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pers.cook.thread; | |
import java.util.Objects; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* Guarded main demo. | |
* | |
* @author Xiaobo Bi ([email protected]) | |
*/ | |
public class GuardedMain { | |
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(); | |
public static void main(String[] args) { | |
for (int i = 0; i < 10; i++) { | |
new Thread(() -> { | |
int id = ATOMIC_INTEGER.getAndIncrement(); | |
// 创建一信息 | |
Message msg1 = new Message(id, "{}" + id); | |
GuardedObject<Message> go = GuardedObject.create(id); | |
// 发送消息, 这里就是开启线程 | |
sendMessage(msg1); | |
// 等待 MQ 信息 | |
Message message = go.get(Objects::nonNull); | |
System.out.println(message); | |
}).start(); | |
} | |
} | |
static void sendMessage(Message message) { | |
CompletableFuture.runAsync(() -> { | |
Message returnMessage = new Message(message.id, "null"); | |
try { | |
// 模拟处理耗时 | |
int abs = (int) ((Math.random() + 1) * 100); | |
System.out.println(message.id + "休眠" + abs + "s"); | |
Thread.sleep(abs); | |
returnMessage = new Message(message.id, "返回" + message.msg); | |
} catch (Exception e) { | |
returnMessage = new Message(message.id, "异常" + e); | |
} finally { | |
GuardedMain.onMessage(returnMessage); | |
} | |
}); | |
} | |
static void onMessage(Message message) { | |
GuardedObject.fireEvent(message.id, message); | |
} | |
static class Message { | |
private int id; | |
private String msg; | |
Message(int id, String msg) { | |
this.id = id; | |
this.msg = msg; | |
} | |
public int getId() { | |
return id; | |
} | |
public void setId(int id) { | |
this.id = id; | |
} | |
public String getMsg() { | |
return msg; | |
} | |
public void setMsg(String msg) { | |
this.msg = msg; | |
} | |
@Override | |
public String toString() { | |
return "Message{" + | |
"id=" + id + | |
", msg='" + msg + '\'' + | |
'}'; | |
} | |
} | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pers.cook.thread; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.Lock; | |
import java.util.concurrent.locks.ReentrantLock; | |
import java.util.function.Predicate; | |
/** | |
* Guarded object demo. 异步转同步的方法. | |
* | |
* @author Xiaobo Bi ([email protected]) | |
*/ | |
class GuardedObject<T> { | |
private final static Map<Object, GuardedObject> GOS = new ConcurrentHashMap<>(); | |
private final Lock lock = new ReentrantLock(); | |
private final Condition done = lock.newCondition(); | |
/** | |
* 受保护的对象 | |
*/ | |
private T obj; | |
static <K, T> GuardedObject<T> create(K key) { | |
GuardedObject<T> object = new GuardedObject<>(); | |
GOS.put(key, object); | |
return object; | |
} | |
@SuppressWarnings("unchecked") | |
static <K, T> void fireEvent(K key, T obj) { | |
GuardedObject go = GOS.remove(key); | |
if (go != null) { | |
go.onChange(obj); | |
} | |
} | |
/** | |
* 获取受保护的对象 | |
* | |
* @param predicate 验证 lambda 有效性 | |
* @return pojo | |
*/ | |
T get(Predicate<T> predicate) { | |
lock.lock(); | |
try { | |
// MESA 管程推荐写法 | |
while (!predicate.test(obj)) { | |
done.await(2, TimeUnit.SECONDS); | |
} | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} finally { | |
lock.unlock(); | |
} | |
return obj; | |
} | |
/** | |
* 事件通知 | |
* | |
* @param obj pojo | |
*/ | |
private void onChange(T obj) { | |
lock.lock(); | |
try { | |
this.obj = obj; | |
done.signalAll(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment