Created
August 9, 2022 22:02
-
-
Save jyemin/e6c7e377e719d366b6b549d8f3564399 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.mongodb.client.MongoClients; | |
import org.bson.BsonDocument; | |
import org.bson.BsonTimestamp; | |
import org.bson.Document; | |
import java.nio.ByteBuffer; | |
import java.nio.ByteOrder; | |
import java.time.ZoneId; | |
import java.time.ZonedDateTime; | |
import java.time.format.DateTimeFormatter; | |
import java.util.HexFormat; | |
import java.util.Objects; | |
import static java.time.Instant.ofEpochSecond; | |
public class ResumeTokenDemo { | |
public static void main(String[] args) throws InterruptedException { | |
var client = MongoClients.create("mongodb://localhost/?directConnection=false"); | |
var coll = client.getDatabase("test").getCollection("test"); | |
coll.insertOne(new Document()); | |
BsonDocument curResumeToken = null; | |
try (var cursor = coll.watch().cursor()) { | |
while (true) { | |
var nextEvent = cursor.tryNext(); | |
if (nextEvent != null) { | |
System.out.println(); | |
System.out.println("Event document key: " + nextEvent.getDocumentKey()); | |
} | |
var latestResumeToken = cursor.getResumeToken(); | |
if (!Objects.equals(latestResumeToken, curResumeToken)) { | |
curResumeToken = latestResumeToken; | |
var latestOperationTime = getTimestampFromResumeToken(Objects.requireNonNull(curResumeToken)); | |
var formattedDate = ZonedDateTime.ofInstant(ofEpochSecond(latestOperationTime.getTime()), ZoneId.of(ZoneId.SHORT_IDS.get("EST"))) | |
.format(DateTimeFormatter.ofPattern("u-M-d hh:mm:ss a O")); | |
System.out.println(); | |
System.out.println("Resume token operation time: " + formattedDate); | |
} | |
System.out.print("."); | |
} | |
} | |
} | |
/** | |
* The resume token has the following structure: | |
* <p> | |
* 1. It's a document containing a single field named "_data" whose value is a string | |
* 2. The string is hex-encoded | |
* 3. The first byte is the "canonical type" for a BSON timestamp, encoded as an unsigned byte. It should always equal 130. | |
* 4. The next 8 bytes are the BSON timestamp representing the operation time, encoded as an unsigned long value with big endian byte | |
* order (unlike BSON itself, which is little endian). The {@link BsonTimestamp} class contains the logic for pulling out the | |
* seconds since the epoch from that value. See <a href="http://bsonspec.org">http://bsonspec.org</a> for details. | |
* 5. There are more fields encoded in the resume token but we don't need to decode them as operation time is always first | |
* @param resumeToken a BSonDocument containing the resume token | |
* @return the operation time contained within the resume token | |
*/ | |
static BsonTimestamp getTimestampFromResumeToken(BsonDocument resumeToken) { | |
if (!resumeToken.containsKey("_data")) { | |
throw new IllegalArgumentException("Expected _data field in resume token"); | |
} | |
var data = resumeToken.get("_data"); | |
byte[] bytes; | |
if (data.isString()) { | |
// 4.2 servers encode _data as a hex string | |
var hexString = data.asString().getValue(); | |
bytes = HexFormat.of().parseHex(hexString); | |
} else if (data.isBinary()) { | |
// 3.6 and 4.0 servers encode _data as binary | |
bytes = data.asBinary().getData(); | |
} else { | |
throw new IllegalArgumentException("Expected binary or string for _data field in resume token but found " + data.getBsonType()); | |
} | |
var byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN); | |
// cast to an int then remove the sign bit to get the unsigned value | |
int canonicalType = ((int) byteBuffer.get()) & 0xff; | |
if (canonicalType != 130) { | |
throw new IllegalArgumentException("Expected canonical type equal to 130, but found " + canonicalType); | |
} | |
var timestampAsLong = byteBuffer.asLongBuffer().get(); | |
return new BsonTimestamp(timestampAsLong); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment