Skip to content

Instantly share code, notes, and snippets.

@rdblue
Created August 3, 2020 16:35
Show Gist options
  • Save rdblue/1d39763be41b59b8637d50162f7fe2ea to your computer and use it in GitHub Desktop.
Save rdblue/1d39763be41b59b8637d50162f7fe2ea to your computer and use it in GitHub Desktop.
package org.apache.iceberg.types;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
public abstract class SchemaWithPartnerVisitor<P, R> {
public interface PartnerAccessors<P> {
P fieldPartner(P partnerStruct, int fieldId, String name);
P mapKeyPartner(P partnerMap);
P mapValuePartner(P partnerMap);
P listElementPartner(P partnerList);
}
public static <P, T> T visit(Schema schema, P partner,
SchemaWithPartnerVisitor<P, T> visitor, PartnerAccessors<P> accessors) {
return visitor.schema(schema, partner, visit(schema.asStruct(), partner, visitor, accessors));
}
public static <P, T> T visit(Type type, P partner,
SchemaWithPartnerVisitor<P, T> visitor, PartnerAccessors<P> accessors) {
switch (type.typeId()) {
case STRUCT:
Types.StructType struct = type.asNestedType().asStructType();
List<T> results = Lists.newArrayListWithExpectedSize(struct.fields().size());
for (Types.NestedField field : struct.fields()) {
P fieldPartner = partner != null ? accessors.fieldPartner(partner, field.fieldId(), field.name()) : null;
visitor.beforeField(field, fieldPartner);
T result;
try {
result = visit(field.type(), fieldPartner, visitor, accessors);
} finally {
visitor.afterField(field, fieldPartner);
}
results.add(visitor.field(field, fieldPartner, result));
}
return visitor.struct(struct, partner, results);
case LIST:
Types.ListType list = type.asNestedType().asListType();
T elementResult;
Types.NestedField elementField = list.field(list.elementId());
P partnerElement = partner != null ? accessors.listElementPartner(partner) : null;
visitor.beforeListElement(elementField, partnerElement);
try {
elementResult = visit(list.elementType(), partnerElement, visitor, accessors);
} finally {
visitor.afterListElement(elementField, partnerElement);
}
return visitor.list(list, partner, elementResult);
case MAP:
Types.MapType map = type.asNestedType().asMapType();
T keyResult;
T valueResult;
Types.NestedField keyField = map.field(map.keyId());
P keyPartner = partner != null ? accessors.mapKeyPartner(partner) : null;
visitor.beforeMapKey(keyField, keyPartner);
try {
keyResult = visit(map.keyType(), keyPartner, visitor, accessors);
} finally {
visitor.afterMapKey(keyField, keyPartner);
}
Types.NestedField valueField = map.field(map.valueId());
P valuePartner = partner != null ? accessors.mapValuePartner(partner) : null;
visitor.beforeMapValue(valueField, valuePartner);
try {
valueResult = visit(map.valueType(), valuePartner, visitor, accessors);
} finally {
visitor.afterMapValue(valueField, valuePartner);
}
return visitor.map(map, partner, keyResult, valueResult);
default:
return visitor.primitive(type.asPrimitiveType(), partner);
}
}
public void beforeField(Types.NestedField field, P partnerField) {
}
public void afterField(Types.NestedField field, P partnerField) {
}
public void beforeListElement(Types.NestedField elementField, P partnerField) {
beforeField(elementField, partnerField);
}
public void afterListElement(Types.NestedField elementField, P partnerField) {
afterField(elementField, partnerField);
}
public void beforeMapKey(Types.NestedField keyField, P partnerField) {
beforeField(keyField, partnerField);
}
public void afterMapKey(Types.NestedField keyField, P partnerField) {
afterField(keyField, partnerField);
}
public void beforeMapValue(Types.NestedField valueField, P partnerField) {
beforeField(valueField, partnerField);
}
public void afterMapValue(Types.NestedField valueField, P partnerField) {
afterField(valueField, partnerField);
}
public R schema(Schema schema, P partner, R structResult) {
return null;
}
public R struct(Types.StructType struct, P partner, List<R> fieldResults) {
return null;
}
public R field(Types.NestedField field, P partner, R fieldResult) {
return null;
}
public R list(Types.ListType list, P partner, R elementResult) {
return null;
}
public R map(Types.MapType map, P partner, R keyResult, R valueResult) {
return null;
}
public R primitive(Type.PrimitiveType primitive, P partner) {
return null;
}
}
package org.apache.iceberg.types;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.iceberg.Schema;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
/**
* Visitor class that accumulates the set of changes needed to evolve an existing schema into the union of the existing
* and a new schema. Changes are added to an {@link UpdateSchema} operation.
*/
class UnionByNameVisitor extends SchemaWithPartnerVisitor<Integer, Boolean> {
/**
* Adds changes needed to produce a union of two schemas to an {@link UpdateSchema} operation.
* <p>
* Changes are accumulated to evolve the existingSchema into a union with newSchema.
*
* @param api an UpdateSchema for adding changes
* @param existingSchema an existing schema
* @param newSchema a new schema to compare with the existing
*/
void visit(UpdateSchema api, Schema existingSchema, Schema newSchema) {
SchemaWithPartnerVisitor.visit(newSchema, -1,
new UnionByNameVisitor(api, existingSchema),
new PartnerIdByNameAccessors(existingSchema));
}
private final UpdateSchema api;
private final Schema partnerSchema;
private UnionByNameVisitor(UpdateSchema api, Schema partnerSchema) {
this.api = api;
this.partnerSchema = partnerSchema;
}
@Override
public Boolean struct(Types.StructType struct, Integer partnerId, List<Boolean> isMissingList) {
if (partnerId == null) {
return true;
}
List<Types.NestedField> fields = struct.fields();
Types.StructType partnerStruct = findFieldType(partnerId).asStructType();
IntStream.range(0, isMissingList.size())
.forEach(pos -> {
Boolean isMissing = isMissingList.get(pos);
Types.NestedField field = fields.get(pos);
if (isMissing) {
addColumn(partnerId, field);
} else {
updateColumn(field, partnerStruct.field(field.name()));
}
});
return false;
}
@Override
public Boolean field(Types.NestedField field, Integer partnerId, Boolean isFieldMissing) {
return partnerId == null;
}
@Override
public Boolean list(Types.ListType list, Integer partnerId, Boolean isElementMissing) {
if (partnerId == null) {
return true;
}
Preconditions.checkState(!isElementMissing, "Error traversing schemas: element is missing, but list is present");
Types.ListType partnerList = findFieldType(partnerId).asListType();
updateColumn(list.fields().get(0), partnerList.fields().get(0));
return false;
}
@Override
public Boolean map(Types.MapType map, Integer partnerId, Boolean isKeyMissing, Boolean isValueMissing) {
if (partnerId == null) {
return true;
}
Preconditions.checkState(!isKeyMissing, "Error traversing schemas: key is missing, but map is present");
Preconditions.checkState(!isValueMissing, "Error traversing schemas: value is missing, but map is present");
Types.MapType partnerMap = findFieldType(partnerId).asMapType();
updateColumn(map.fields().get(0), partnerMap.fields().get(0));
updateColumn(map.fields().get(1), partnerMap.fields().get(1));
return false;
}
@Override
public Boolean primitive(Type.PrimitiveType primitive, Integer partnerId) {
return partnerId == null;
}
private Type findFieldType(int fieldId) {
if (fieldId == -1) {
return partnerSchema.asStruct();
} else {
return partnerSchema.findField(fieldId).type();
}
}
private void addColumn(int parentId, Types.NestedField field) {
String parentName = partnerSchema.findColumnName(parentId);
api.addColumn(parentName, field.name(), field.type(), field.doc());
}
private void updateColumn(Types.NestedField field, Types.NestedField existingField) {
String fullName = partnerSchema.findColumnName(existingField.fieldId());
boolean needsOptionalUpdate = field.isOptional() && existingField.isRequired();
boolean needsTypeUpdate = field.type().isPrimitiveType() && !field.type().equals(existingField.type());
boolean needsDocUpdate = field.doc() != null && !field.doc().equals(existingField.doc());
if (needsOptionalUpdate) {
api.makeColumnOptional(fullName);
}
if (needsTypeUpdate) {
api.updateColumn(fullName, field.type().asPrimitiveType());
}
if (needsDocUpdate) {
api.updateColumnDoc(fullName, field.doc());
}
}
private static class PartnerIdByNameAccessors implements PartnerAccessors<Integer> {
private final Schema partnerSchema;
private PartnerIdByNameAccessors(Schema partnerSchema) {
this.partnerSchema = partnerSchema;
}
@Override
public Integer fieldPartner(Integer partnerStructId, int fieldId, String name) {
Types.StructType struct;
if (fieldId == -1) {
struct = partnerSchema.asStruct();
} else {
struct = partnerSchema.findField(partnerStructId).type().asStructType();
}
Types.NestedField field = struct.field(name);
if (field != null) {
return field.fieldId();
}
return null;
}
@Override
public Integer mapKeyPartner(Integer partnerMapId) {
Types.NestedField mapField = partnerSchema.findField(partnerMapId);
if (mapField != null) {
return mapField.type().asMapType().fields().get(0).fieldId();
}
return null;
}
@Override
public Integer mapValuePartner(Integer partnerMapId) {
Types.NestedField mapField = partnerSchema.findField(partnerMapId);
if (mapField != null) {
return mapField.type().asMapType().fields().get(1).fieldId();
}
return null;
}
@Override
public Integer listElementPartner(Integer partnerListId) {
Types.NestedField listField = partnerSchema.findField(partnerListId);
if (listField != null) {
return listField.type().asListType().fields().get(0).fieldId();
}
return null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment