Created
July 20, 2022 01:05
-
-
Save jyemin/ce8c44bc361a8e34946d52c43b1132f6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.mongodb.client.MongoClients; | |
import org.bson.BsonDocument; | |
import org.bson.BsonTimestamp; | |
import org.bson.Document; | |
import java.nio.ByteBuffer; | |
import java.nio.ByteOrder; | |
import java.time.ZoneId; | |
import java.time.ZonedDateTime; | |
import java.time.format.DateTimeFormatter; | |
import java.util.HexFormat; | |
import java.util.Objects; | |
import static java.time.Instant.ofEpochSecond; | |
public class ResumeTokenDemo { | |
public static void main(String[] args) throws InterruptedException { | |
var client = MongoClients.create("mongodb://localhost/?directConnection=false"); | |
var coll = client.getDatabase("test").getCollection("test"); | |
coll.insertOne(new Document()); | |
BsonDocument curResumeToken = null; | |
try (var cursor = coll.watch().cursor()) { | |
while (true) { | |
var nextEvent = cursor.tryNext(); | |
if (nextEvent != null) { | |
System.out.println(nextEvent.getDocumentKey()); | |
System.out.println(); | |
} | |
var latestResumeToken = cursor.getResumeToken(); | |
if (!Objects.equals(latestResumeToken, curResumeToken)) { | |
curResumeToken = latestResumeToken; | |
System.out.println(); | |
var latestOperationTime = getTimestampFromResumeToken(Objects.requireNonNull(curResumeToken)); | |
var formattedDate = ZonedDateTime.ofInstant(ofEpochSecond(latestOperationTime.getTime()), ZoneId.of(ZoneId.SHORT_IDS.get("EST"))) | |
.format(DateTimeFormatter.ofPattern("u-M-d hh:mm:ss a O")); | |
System.out.println(formattedDate); | |
} | |
System.out.print("."); | |
} | |
} | |
} | |
/** | |
* The resume token has the following structure: | |
* <p> | |
* 1. It's a document containing a single field named "_data" whose value is a string | |
* 2. The string is hex-encoded | |
* 3. The first byte is the "canonical type" for a BSON timestamp, encoded as an unsigned byte. It should always equal 130. | |
* 4. The next 8 bytes are the BSON timestamp representing the operation time, encoded as an unsigned long value with big endian byte | |
* order (unlike BSON itself, which is little endian). The {@link BsonTimestamp} class contains the logic for pulling out the | |
* seconds since the epoch from that value. See <a href="http://bsonspec.org">http://bsonspec.org</a> for details. | |
* | |
* @param resumeToken a BSonDocument containing the resume token | |
* @return the operation time contained within the resume token | |
*/ | |
static BsonTimestamp getTimestampFromResumeToken(BsonDocument resumeToken) { | |
var hexString = resumeToken.getString("_data").getValue(); | |
var bytes = HexFormat.of().parseHex(hexString); | |
var byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN); | |
// cast to an int then remove the sign bit to get the unsigned value | |
int canonicalType = ((int) byteBuffer.get()) & 0xff; | |
if (canonicalType != 130) { | |
throw new IllegalArgumentException("Expected canonical type equal to 130, but found " + canonicalType); | |
} | |
var timestampAsLong = byteBuffer.asLongBuffer().get(); | |
return new BsonTimestamp(timestampAsLong); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
There is actually more data in the resume token, but the timestamp happens to be first and the rest can be ignored. It's equivalent semantically to a BSON document like this: