Skip to content

Instantly share code, notes, and snippets.

@jyemin
Created July 20, 2022 14:31
Show Gist options
  • Save jyemin/c3d7b1eabebaac78f7cfa0250eadfd93 to your computer and use it in GitHub Desktop.
Save jyemin/c3d7b1eabebaac78f7cfa0250eadfd93 to your computer and use it in GitHub Desktop.
This one is better. It handles resume tokens generated by 3.6/4.0 servers as well as new servers.
import com.mongodb.client.MongoClients;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.Document;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HexFormat;
import java.util.Objects;
import static java.time.Instant.ofEpochSecond;
public class ResumeTokenDemo {
public static void main(String[] args) throws InterruptedException {
var client = MongoClients.create("mongodb://localhost/?directConnection=false");
var coll = client.getDatabase("test").getCollection("test");
coll.insertOne(new Document());
BsonDocument curResumeToken = null;
try (var cursor = coll.watch().cursor()) {
while (true) {
var nextEvent = cursor.tryNext();
if (nextEvent != null) {
System.out.println();
System.out.println("Event document key: " + nextEvent.getDocumentKey());
}
var latestResumeToken = cursor.getResumeToken();
if (!Objects.equals(latestResumeToken, curResumeToken)) {
curResumeToken = latestResumeToken;
var latestOperationTime = getTimestampFromResumeToken(Objects.requireNonNull(curResumeToken));
var formattedDate = ZonedDateTime.ofInstant(ofEpochSecond(latestOperationTime.getTime()), ZoneId.of(ZoneId.SHORT_IDS.get("EST")))
.format(DateTimeFormatter.ofPattern("u-M-d hh:mm:ss a O"));
System.out.println();
System.out.println("Resume token operation time: " + formattedDate);
}
System.out.print(".");
}
}
}
/**
* The resume token has the following structure:
* <p>
* 1. It's a document containing a single field named "_data" whose value is a string
* 2. The string is hex-encoded
* 3. The first byte is the "canonical type" for a BSON timestamp, encoded as an unsigned byte. It should always equal 130.
* 4. The next 8 bytes are the BSON timestamp representing the operation time, encoded as an unsigned long value with big endian byte
* order (unlike BSON itself, which is little endian). The {@link BsonTimestamp} class contains the logic for pulling out the
* seconds since the epoch from that value. See <a href="http://bsonspec.org">http://bsonspec.org</a> for details.
* 5. There are more fields encoded in the resume token but we don't need to decode them as operation time is always first
* @param resumeToken a BSonDocument containing the resume token
* @return the operation time contained within the resume token
*/
static BsonTimestamp getTimestampFromResumeToken(BsonDocument resumeToken) {
if (!resumeToken.containsKey("_data")) {
throw new IllegalArgumentException("Expected _data field in resume token");
}
var data = resumeToken.get("_data");
byte[] bytes;
if (data.isString()) {
// 4.2 servers encode _data as a hex string
var hexString = data.asString().getValue();
bytes = HexFormat.of().parseHex(hexString);
} else if (data.isBinary()) {
// 3.6 and 4.0 servers encode _data as binary
bytes = data.asBinary().getData();
} else {
throw new IllegalArgumentException("Expected binary or string for _data field in resume token but found " + data.getBsonType());
}
var byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.BIG_ENDIAN);
// cast to an int then remove the sign bit to get the unsigned value
int canonicalType = ((int) byteBuffer.get()) & 0xff;
if (canonicalType != 130) {
throw new IllegalArgumentException("Expected canonical type equal to 130, but found " + canonicalType);
}
var timestampAsLong = byteBuffer.asLongBuffer().get();
return new BsonTimestamp(timestampAsLong);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment