Skip to content

Instantly share code, notes, and snippets.

@jyemin
Created July 20, 2022 01:05
Show Gist options
  • Save jyemin/ce8c44bc361a8e34946d52c43b1132f6 to your computer and use it in GitHub Desktop.
Save jyemin/ce8c44bc361a8e34946d52c43b1132f6 to your computer and use it in GitHub Desktop.
import com.mongodb.client.MongoClients;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.Document;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HexFormat;
import java.util.Objects;
import static java.time.Instant.ofEpochSecond;
public class ResumeTokenDemo {
public static void main(String[] args) throws InterruptedException {
var client = MongoClients.create("mongodb://localhost/?directConnection=false");
var coll = client.getDatabase("test").getCollection("test");
coll.insertOne(new Document());
BsonDocument curResumeToken = null;
try (var cursor = coll.watch().cursor()) {
while (true) {
var nextEvent = cursor.tryNext();
if (nextEvent != null) {
System.out.println(nextEvent.getDocumentKey());
System.out.println();
}
var latestResumeToken = cursor.getResumeToken();
if (!Objects.equals(latestResumeToken, curResumeToken)) {
curResumeToken = latestResumeToken;
System.out.println();
var latestOperationTime = getTimestampFromResumeToken(Objects.requireNonNull(curResumeToken));
var formattedDate = ZonedDateTime.ofInstant(ofEpochSecond(latestOperationTime.getTime()), ZoneId.of(ZoneId.SHORT_IDS.get("EST")))
.format(DateTimeFormatter.ofPattern("u-M-d hh:mm:ss a O"));
System.out.println(formattedDate);
}
System.out.print(".");
}
}
}
/**
* The resume token has the following structure:
* <p>
* 1. It's a document containing a single field named "_data" whose value is a string
* 2. The string is hex-encoded
* 3. The first byte is the "canonical type" for a BSON timestamp, encoded as an unsigned byte. It should always equal 130.
* 4. The next 8 bytes are the BSON timestamp representing the operation time, encoded as an unsigned long value with big endian byte
* order (unlike BSON itself, which is little endian). The {@link BsonTimestamp} class contains the logic for pulling out the
* seconds since the epoch from that value. See <a href="http://bsonspec.org">http://bsonspec.org</a> for details.
*
* @param resumeToken a BSonDocument containing the resume token
* @return the operation time contained within the resume token
*/
static BsonTimestamp getTimestampFromResumeToken(BsonDocument resumeToken) {
var hexString = resumeToken.getString("_data").getValue();
var bytes = HexFormat.of().parseHex(hexString);
var byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN);
// cast to an int then remove the sign bit to get the unsigned value
int canonicalType = ((int) byteBuffer.get()) & 0xff;
if (canonicalType != 130) {
throw new IllegalArgumentException("Expected canonical type equal to 130, but found " + canonicalType);
}
var timestampAsLong = byteBuffer.asLongBuffer().get();
return new BsonTimestamp(timestampAsLong);
}
}
@jyemin
Copy link
Author

jyemin commented Jul 20, 2022

There is actually more data in the resume token, but the timestamp happens to be first and the rest can be ignored. It's equivalent semantically to a BSON document like this:

{
  timestamp: new Timestamp({ t: 1630438675, i: 1 }),
  version: 1,
  tokenType: 128,
  txnOpIndex: 0,
  fromInvalidate: false,
  uuid: new UUID("a5093abb-38fe-4b9e-a67f-01bb1a96d812"),
  documentKey: { _id: '___x' }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment