Skip to content

Instantly share code, notes, and snippets.

@rdblue
Last active October 28, 2024 15:41
Show Gist options
  • Save rdblue/7969f87d6e809cbd8d8b2b5af6543734 to your computer and use it in GitHub Desktop.
Save rdblue/7969f87d6e809cbd8d8b2b5af6543734 to your computer and use it in GitHub Desktop.
Spec-based Java implementation of Variant
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
*
*/
package org.apache.iceberg;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.Pair;
public class Variants {
public interface Metadata {
int id(String name);
String get(int id);
// int bufferSize();
ByteBuffer serialize();
}
public interface Value {
PhysicalType type();
// int bufferSize();
ByteBuffer serialize();
}
public interface PrimitiveVariant<T> extends Value {
T get();
}
public interface ObjectVariant extends Value {
Value get(String field);
default PhysicalType type() {
return PhysicalType.OBJECT;
}
}
public interface ArrayVariant extends Value {
Value get(int index);
default PhysicalType type() {
return PhysicalType.ARRAY;
}
}
private static class Primitives {
private static final int TYPE_NULL = 0;
private static final int TYPE_TRUE = 1;
private static final int TYPE_FALSE = 2;
private static final int TYPE_INT8 = 3;
private static final int TYPE_INT16 = 4;
private static final int TYPE_INT32 = 5;
private static final int TYPE_INT64 = 6;
private static final int TYPE_DOUBLE = 7;
private static final int TYPE_DECIMAL4 = 8;
private static final int TYPE_DECIMAL8 = 9;
private static final int TYPE_DECIMAL16 = 10;
private static final int TYPE_DATE = 11;
private static final int TYPE_TIMESTAMPTZ = 12; // equivalent to timestamptz
private static final int TYPE_TIMESTAMPNTZ = 13; // equivalent to timestamp
private static final int TYPE_FLOAT = 14;
private static final int TYPE_BINARY = 15;
private static final int TYPE_STRING = 16;
private Primitives() {}
}
public enum LogicalType {
NULL,
BOOLEAN,
EXACT_NUMERIC,
FLOAT,
DOUBLE,
DATE,
TIMESTAMPTZ,
TIMESTAMPNTZ,
BINARY,
STRING,
ARRAY,
OBJECT
}
public enum PhysicalType {
NULL(LogicalType.NULL, Void.class),
BOOLEAN_TRUE(LogicalType.BOOLEAN, Boolean.class),
BOOLEAN_FALSE(LogicalType.BOOLEAN, Boolean.class),
INT8(LogicalType.EXACT_NUMERIC, Integer.class),
INT16(LogicalType.EXACT_NUMERIC, Integer.class),
INT32(LogicalType.EXACT_NUMERIC, Integer.class),
INT64(LogicalType.EXACT_NUMERIC, Long.class),
DOUBLE(LogicalType.DOUBLE, Double.class),
DECIMAL4(LogicalType.EXACT_NUMERIC, BigDecimal.class),
DECIMAL8(LogicalType.EXACT_NUMERIC, BigDecimal.class),
DECIMAL16(LogicalType.EXACT_NUMERIC, BigDecimal.class),
DATE(LogicalType.DATE, Integer.class),
TIMESTAMPTZ(LogicalType.TIMESTAMPTZ, Long.class),
TIMESTAMPNTZ(LogicalType.TIMESTAMPNTZ, Long.class),
FLOAT(LogicalType.FLOAT, Float.class),
BINARY(LogicalType.BINARY, ByteBuffer.class),
STRING(LogicalType.STRING, String.class),
ARRAY(LogicalType.ARRAY, List.class),
OBJECT(LogicalType.OBJECT, Map.class);
private final LogicalType logicalType;
private final Class<?> javaClass;
PhysicalType(LogicalType logicalType, Class<?> javaClass) {
this.logicalType = logicalType;
this.javaClass = javaClass;
}
public LogicalType toLogicalType() {
return logicalType;
}
public Class<?> javaClass() {
return javaClass;
}
public static PhysicalType from(int primitiveType) {
switch (primitiveType) {
case Primitives.TYPE_NULL:
return NULL;
case Primitives.TYPE_TRUE:
return BOOLEAN_TRUE;
case Primitives.TYPE_FALSE:
return BOOLEAN_FALSE;
case Primitives.TYPE_INT8:
return INT8;
case Primitives.TYPE_INT16:
return INT16;
case Primitives.TYPE_INT32:
return INT32;
case Primitives.TYPE_INT64:
return INT64;
case Primitives.TYPE_DATE:
return DATE;
case Primitives.TYPE_TIMESTAMPTZ:
return TIMESTAMPTZ;
case Primitives.TYPE_TIMESTAMPNTZ:
return TIMESTAMPNTZ;
case Primitives.TYPE_FLOAT:
return FLOAT;
case Primitives.TYPE_DOUBLE:
return DOUBLE;
case Primitives.TYPE_DECIMAL4:
return DECIMAL4;
case Primitives.TYPE_DECIMAL8:
return DECIMAL8;
case Primitives.TYPE_DECIMAL16:
return DECIMAL16;
case Primitives.TYPE_BINARY:
return BINARY;
case Primitives.TYPE_STRING:
return STRING;
}
throw new UnsupportedOperationException("Unknown primitive physical type: " + primitiveType);
}
}
static class VariantUtil {
private VariantUtil() {}
static int readByte(ByteBuffer buffer, int offset) {
return buffer.get(buffer.position() + offset) & 0xFF;
}
static int readLittleEndian(ByteBuffer buffer, int offset, int size) {
int base = buffer.position() + offset;
switch (size) {
case 4:
return buffer.getInt(base);
case 3:
return buffer.getShort(base) | (buffer.get(base + 2) & 0xFF) << 16;
case 2:
return buffer.getShort(base);
case 1:
return buffer.get(base);
}
throw new IllegalArgumentException("Invalid size: " + size);
}
static int readLittleEndianInt(ByteBuffer buffer, int offset) {
return buffer.getInt(buffer.position() + offset);
}
static long readLittleEndianLong(ByteBuffer buffer, int offset) {
return buffer.getLong(buffer.position() + offset);
}
static float readFloat(ByteBuffer buffer, int offset) {
return buffer.getFloat(buffer.position() + offset);
}
static double readDouble(ByteBuffer buffer, int offset) {
return buffer.getDouble(buffer.position() + offset);
}
static ByteBuffer slice(ByteBuffer buffer, int offset, int length) {
ByteBuffer slice = buffer.duplicate();
slice.order(ByteOrder.LITTLE_ENDIAN);
slice.position(buffer.position() + offset);
slice.limit(buffer.position() + offset + length);
return slice;
}
static String readString(ByteBuffer buffer, int offset, int length) {
if (buffer.hasArray()) {
return new String(
buffer.array(),
buffer.arrayOffset() + buffer.position() + offset,
length,
StandardCharsets.UTF_8);
} else {
return StandardCharsets.UTF_8.decode(slice(buffer, offset, length)).toString();
}
}
static <T extends Comparable<T>> int find(int size, T key, Function<Integer, T> resolve) {
int low = 0;
int high = size - 1;
while (low <= high) {
int mid = (low + high) >>> 1;
T value = resolve.apply(mid);
int cmp = key.compareTo(value);
if (cmp == 0) {
return mid;
} else if (cmp < 0) {
high = mid - 1;
} else {
low = mid + 1;
}
}
return -1;
}
}
public static Pair<ByteBuffer, ByteBuffer> serialize(Value value) {
switch (value.type()) {
case OBJECT:
return Pair.of(((VariantObject) value).metadata.serialize(), value.serialize());
case ARRAY:
return Pair.of(((VariantArray) value).metadata.serialize(), value.serialize());
default:
return Pair.of(VariantMetadata.EMPTY_V1_BUFFER, value.serialize());
}
}
private static final int HEADER_SIZE = 1;
private static final int BASIC_TYPE_MASK = 0b11;
private static final int BASIC_TYPE_PRIMITIVE = 0;
private static final int BASIC_TYPE_SHORT_STRING = 1;
private static final int BASIC_TYPE_OBJECT = 2;
private static final int BASIC_TYPE_ARRAY = 3;
public static Value from(ByteBuffer metadata, ByteBuffer value) {
return from(VariantMetadata.from(metadata), value);
}
private static Value from(VariantMetadata metadata, ByteBuffer value) {
int header = VariantUtil.readByte(value, 0);
int basicType = header & BASIC_TYPE_MASK;
switch (basicType) {
case BASIC_TYPE_PRIMITIVE:
return VariantPrimitive.from(value, header);
case BASIC_TYPE_SHORT_STRING:
return VariantShortString.from(value, header);
case BASIC_TYPE_OBJECT:
return VariantObject.from(metadata, value, header);
case BASIC_TYPE_ARRAY:
return VariantArray.from(metadata, value, header);
default:
throw new UnsupportedOperationException("Unsupported basic type: %s" + basicType);
}
}
static class VariantMetadata implements Metadata {
private static final int SUPPORTED_VERSION = 1;
private static final int VERSION_MASK = 0b1111;
private static final int SORTED_STRINGS = 0b10000;
private static final int RESERVED = 0b100000;
private static final int OFFSET_SIZE_MASK = 0b11000000;
private static final int OFFSET_SIZE_SHIFT = 6;
static final ByteBuffer EMPTY_V1_BUFFER =
ByteBuffer.wrap(new byte[] {0x01, 0x00}).order(ByteOrder.LITTLE_ENDIAN);
static VariantMetadata from(byte[] bytes) {
return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN));
}
static VariantMetadata from(ByteBuffer metadata) {
Preconditions.checkArgument(
metadata.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
int header = VariantUtil.readByte(metadata, 0);
int version = header & VERSION_MASK;
Preconditions.checkArgument(SUPPORTED_VERSION == version, "Unsupported version: %s", version);
return new VariantMetadata(metadata, header);
}
private final ByteBuffer metadata;
private final boolean isSorted;
private final int offsetSize;
private final int offsetListOffset;
private final int dataOffset;
private final String[] dict;
private VariantMetadata(ByteBuffer metadata, int header) {
this.metadata = metadata;
this.isSorted = (header & SORTED_STRINGS) == SORTED_STRINGS;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
int dictSize = VariantUtil.readLittleEndian(metadata, HEADER_SIZE, offsetSize);
this.dict = new String[dictSize];
this.offsetListOffset = HEADER_SIZE + offsetSize;
this.dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize);
}
@VisibleForTesting
int dictionarySize() {
return dict.length;
}
@VisibleForTesting
boolean isSorted() {
return isSorted;
}
/** Returns the position of the string in the metadata, or -1 if the string is not found. */
@Override
public int id(String name) {
if (name != null) {
if (isSorted) {
return VariantUtil.find(dict.length, name, this::get);
} else {
for (int id = 0; id < dict.length; id += 1) {
if (name.equals(get(id))) {
return id;
}
}
}
}
return -1;
}
/** Returns the string for the given dictionary id. */
@Override
public String get(int index) {
if (null == dict[index]) {
int offset =
VariantUtil.readLittleEndian(
metadata, offsetListOffset + (offsetSize * index), offsetSize);
int next =
VariantUtil.readLittleEndian(
metadata, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
dict[index] = VariantUtil.readString(metadata, dataOffset + offset, next - offset);
}
return dict[index];
}
@Override
public ByteBuffer serialize() {
return metadata;
}
}
static class VariantPrimitive implements PrimitiveVariant<Object> {
private static final int PRIMITIVE_TYPE_SHIFT = 2;
private static final int PRIMITIVE_OFFSET = HEADER_SIZE;
static VariantPrimitive from(byte[] bytes) {
return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}
private static VariantPrimitive from(ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
int basicType = header & BASIC_TYPE_MASK;
Preconditions.checkArgument(
basicType == BASIC_TYPE_PRIMITIVE, "Invalid primitive, basic type != 0: " + basicType);
return new VariantPrimitive(value, header);
}
private final ByteBuffer value;
private final PhysicalType type;
private Object primitive = null;
private VariantPrimitive(ByteBuffer value, int header) {
this.value = value;
this.type = PhysicalType.from(header >> PRIMITIVE_TYPE_SHIFT);
}
private Object read() {
switch (type) {
case NULL:
return null;
case BOOLEAN_TRUE:
return true;
case BOOLEAN_FALSE:
return false;
case INT8:
return VariantUtil.readByte(value, PRIMITIVE_OFFSET);
case INT16:
return VariantUtil.readLittleEndian(value, PRIMITIVE_OFFSET, 2);
case INT32:
case DATE:
return VariantUtil.readLittleEndianInt(value, PRIMITIVE_OFFSET);
case INT64:
case TIMESTAMPTZ:
case TIMESTAMPNTZ:
return VariantUtil.readLittleEndianLong(value, PRIMITIVE_OFFSET);
case FLOAT:
return VariantUtil.readFloat(value, PRIMITIVE_OFFSET);
case DOUBLE:
return VariantUtil.readDouble(value, PRIMITIVE_OFFSET);
case DECIMAL4:
{
int scale = VariantUtil.readByte(value, PRIMITIVE_OFFSET);
int unscaled = VariantUtil.readLittleEndianInt(value, PRIMITIVE_OFFSET + 1);
return new BigDecimal(BigInteger.valueOf(unscaled), scale);
}
case DECIMAL8:
{
int scale = VariantUtil.readByte(value, PRIMITIVE_OFFSET);
long unscaled = VariantUtil.readLittleEndianLong(value, PRIMITIVE_OFFSET + 1);
return new BigDecimal(BigInteger.valueOf(unscaled), scale);
}
case DECIMAL16:
throw new UnsupportedOperationException("unsupported");
case BINARY:
{
int size = VariantUtil.readLittleEndianInt(value, PRIMITIVE_OFFSET);
return VariantUtil.slice(value, PRIMITIVE_OFFSET + 4, size);
}
case STRING:
{
int size = VariantUtil.readLittleEndianInt(value, PRIMITIVE_OFFSET);
return VariantUtil.readString(value, PRIMITIVE_OFFSET + 4, size);
}
}
throw new UnsupportedOperationException("Unsupported primitive type: " + type);
}
@Override
public PhysicalType type() {
return type;
}
@Override
public Object get() {
if (null == primitive) {
this.primitive = read();
}
return primitive;
}
@Override
public ByteBuffer serialize() {
return value;
}
}
static class VariantShortString implements PrimitiveVariant<String> {
private static final int LENGTH_MASK = 0b11111100;
private static final int LENGTH_SHIFT = 2;
static VariantShortString from(byte[] bytes) {
return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}
static VariantShortString from(ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
int basicType = header & BASIC_TYPE_MASK;
Preconditions.checkArgument(
basicType == BASIC_TYPE_SHORT_STRING,
"Invalid short string, basic type != 1: " + basicType);
return new VariantShortString(value, header);
}
private final ByteBuffer value;
private final int length;
private String string = null;
private VariantShortString(ByteBuffer value, int header) {
this.value = value;
this.length = ((header & LENGTH_MASK) >> LENGTH_SHIFT);
}
@Override
public PhysicalType type() {
return PhysicalType.STRING;
}
@Override
public String get() {
if (null == string) {
this.string = VariantUtil.readString(value, HEADER_SIZE, length);
}
return string;
}
@Override
public ByteBuffer serialize() {
return value;
}
}
static class VariantObject implements ObjectVariant {
private static final int OFFSET_SIZE_MASK = 0b11;
private static final int FIELD_ID_SIZE_MASK = 0b1100;
private static final int FIELD_ID_SIZE_SHIFT = 2;
private static final int IS_LARGE = 0b10000;
public static VariantObject from(VariantMetadata metadata, ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
int basicType = header & BASIC_TYPE_MASK;
Preconditions.checkArgument(
basicType == BASIC_TYPE_OBJECT, "Invalid object, basic type != 2: " + basicType);
return new VariantObject(metadata, value, header);
}
private final VariantMetadata metadata;
private final ByteBuffer value;
private final int fieldIdSize;
private final int fieldIdListOffset;
private final int[] fieldIds;
private final int offsetSize;
private final int offsetListOffset;
private final int dataOffset;
private VariantObject(VariantMetadata metadata, ByteBuffer value, int header) {
this.metadata = metadata;
this.value = value;
this.offsetSize = 1 + (header & OFFSET_SIZE_MASK);
this.fieldIdSize = 1 + ((header & FIELD_ID_SIZE_MASK) >> FIELD_ID_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
int numElements = VariantUtil.readLittleEndian(value, HEADER_SIZE, numElementsSize);
this.fieldIdListOffset = HEADER_SIZE + numElementsSize;
this.fieldIds = new int[numElements];
this.offsetListOffset = fieldIdListOffset + (numElements * fieldIdSize);
this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
}
// keys are ordered lexicographically by the name
@Override
public Value get(String name) {
int index =
VariantUtil.find(
fieldIds.length,
name,
pos -> {
int id =
VariantUtil.readLittleEndian(
value, fieldIdListOffset + (pos * fieldIdSize), fieldIdSize);
return metadata.get(id);
});
if (index < 0) {
return null;
}
int offset =
VariantUtil.readLittleEndian(value, offsetListOffset + (index * offsetSize), offsetSize);
int next =
VariantUtil.readLittleEndian(
value, offsetListOffset + ((1 + index) * offsetSize), offsetSize);
return Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
}
@Override
public ByteBuffer serialize() {
return value;
}
}
static class VariantArray implements ArrayVariant {
private static final int OFFSET_SIZE_MASK = 0b1100;
private static final int OFFSET_SIZE_SHIFT = 2;
private static final int IS_LARGE = 0b10000;
@VisibleForTesting
static VariantArray from(VariantMetadata metadata, byte[] bytes) {
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}
static VariantArray from(VariantMetadata metadata, ByteBuffer value, int header) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
int basicType = header & BASIC_TYPE_MASK;
Preconditions.checkArgument(
basicType == BASIC_TYPE_ARRAY, "Invalid array, basic type != 3: " + basicType);
return new VariantArray(metadata, value, header);
}
private final VariantMetadata metadata;
private final ByteBuffer value;
private final int offsetSize;
private final int offsetListOffset;
private final int dataOffset;
private final Value[] array;
private VariantArray(VariantMetadata metadata, ByteBuffer value, int header) {
this.metadata = metadata;
this.value = value;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
int numElements = VariantUtil.readLittleEndian(value, HEADER_SIZE, numElementsSize);
this.offsetListOffset = HEADER_SIZE + numElementsSize;
this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
this.array = new Value[numElements];
}
@VisibleForTesting
int numElements() {
return array.length;
}
@Override
public Value get(int index) {
if (null == array[index]) {
int offset =
VariantUtil.readLittleEndian(
value, offsetListOffset + (offsetSize * index), offsetSize);
int next =
VariantUtil.readLittleEndian(
value, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
array[index] = Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
}
return array[index];
}
@Override
public ByteBuffer serialize() {
return value;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment