Last active
May 4, 2022 16:13
-
-
Save pivovarit/0cc15abea9ecd75a8c4f742b1f1d4b22 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pivovarit; | |
import java.util.Arrays; | |
import java.util.Objects; | |
/** | |
* Conveys a piece of data between one producer thread and arbitrarily many | |
* consumer threads. The producer may at any time call | |
* {@link #setSerialized(byte[])} to publish an object in serialized form. | |
* After that any consumer may call {@link #getDeserialized()} any number of | |
* times and it will retrieve the deserialized object. | |
* | |
* <h3>Assumptions on the use case</h3> | |
* <ol><li> | |
* The producer thread has many responsibilities, its time spent at any one | |
* task must be minimized. | |
* </li><li> | |
* Deserialization is expensive, therefore the producer must be relieved from | |
* it. | |
* </li><li> | |
* The usage pattern is read-heavy: there are many more invocations of | |
* {@link #getDeserialized()} than of {@link #setSerialized(byte[])}. | |
* </li><li> | |
* Each returned instance will probably be retained on the heap for a long time. | |
* </li></ol> | |
* | |
* <h3>Desired characteristics of the implementation</h3> | |
* <ol><li> | |
* Deserialization is lazy: if there is no invocation of {@link #getDeserialized()}, | |
* then no deserialization happens. | |
* </li><li> | |
* Once a consumer deserializes the value, this object caches and shares it with | |
* future consumers. | |
* </li><li> | |
* The invocations of {@link #getDeserialized()} will return at most as many | |
* distinct instances as there were invocations of | |
* {@link #setSerialized(byte[])}. | |
* </li><li> | |
* {@link #setSerialized(byte[])} is wait-free: it always completes in a finite | |
* number of steps, regardless of any concurrent invocations of | |
* {@link #getDeserialized()}. | |
* </li><li> | |
* {@link #getDeserialized()} is wait-free with respect to | |
* {@link #setSerialized(byte[])}: the producer thread may do anything, such | |
* as calling {@link #setSerialized(byte[])} at a very high rate or getting | |
* indefinitely suspended within an invocation, without affecting the ability | |
* of {@link #getDeserialized()} to complete in a finite number of steps. | |
* </li><li> | |
* {@link #getDeserialized()} is also wait-free against itself (concurrent | |
* invocations don't interfere with each other), with one allowed exception: | |
* when it observes a new serialized value, it may choose to block some of | |
* the other invocations of {@link #getDeserialized()} until it completes. | |
* More formally, after the following sequence of events has occurred: | |
* <ol><li> | |
* the producer completes its last invocation of {@link #setSerialized(byte[])}; | |
* </li><li> | |
* a consumer starts an invocation of {@link #getDeserialized()}; | |
* </li><li> | |
* the invocation completes by returning the object deserialized from the | |
* blob set by that last invocation, | |
* </li></ol> | |
* all future invocations of {@link #getDeserialized()} are wait-free. Note | |
* that an implementation without the above exception is also possible. | |
* </li><li> | |
* {@link #getDeserialized()} exhibits (at least) <em>eventually consistent, | |
* monotonic read</em> behavior: once a consumer has observed an object derived | |
* from a serialized value S, it will never observe an object derived from a | |
* serialized value older than S, nor will it observe the initial {@code null} | |
* value. If at any point the producer invokes {@link #setSerialized(byte[])} | |
* for the last time and the consumer keeps invoking {@link #getDeserialized()}, | |
* eventually it will return an object deserialized from that final producer's | |
* invocation. | |
* </li></ol> | |
*/ | |
public class ConcurrentDeserializer { | |
private volatile DeserializedValue value; | |
/** | |
* Sets a new serialized value. Exclusively called by a single producer | |
* thread. | |
*/ | |
public void setSerialized(byte[] blob) { | |
Objects.requireNonNull(blob); | |
value = new DeserializedValue(Arrays.copyOf(blob, blob.length)); | |
} | |
/** | |
* Returns the result of deserializing a blob previously set by the | |
* producer thread. Called by arbitrarily many consumer threads. Initially | |
* (before the first invocation of {@link #setSerialized(byte[])}) the | |
* method returns {@code null}. | |
*/ | |
public Object getDeserialized() { | |
return value == null ? null : value.get(); | |
} | |
// Details of deserialization are out of scope for this assignment. | |
// You may use this mock implementation: | |
private Object deserialize(byte[] blob) { | |
if (blob == null) { | |
return null; | |
} | |
return new String(blob); | |
} | |
private final class DeserializedValue { | |
private volatile byte[] payload; | |
private volatile Object deserialized; | |
private final Object lock = new Object(); | |
public DeserializedValue(byte[] payload) { | |
this.payload = payload; | |
} | |
public Object get() { | |
if (deserialized == null) { | |
synchronized (lock) { | |
if (deserialized == null) { | |
deserialized = deserialize(payload); | |
payload = null; | |
} | |
} | |
} | |
return deserialized; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment