Last active
February 23, 2024 00:51
-
-
Save egalpin/1831808e5676c0b278c9732e42fab4e5 to your computer and use it in GitHub Desktop.
JsonMutationFn using Beam and Jackson Databind
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.core.JsonPointer; | |
import com.fasterxml.jackson.databind.json.JsonMapper; | |
import com.fasterxml.jackson.databind.node.ObjectNode; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; | |
/** | |
* This DoFn is mostly used to rename json fields, but is flexible enough, thanks to json pointers, | |
* to support renaming an entity from a json source document that might be arbitrarily nested or | |
* buried and rename it to a top-level field. | |
*/ | |
public class JsonMutationFn extends DoFn<String, String> { | |
private static final JsonPointer TERMINAL_NODE = JsonPointer.compile(""); | |
private final List<NodeMutationFn> _mutations; | |
private static final JsonMapper MAPPER = new JsonMapper(); | |
private JsonMutationFn(List<NodeMutationFn> mutations) { | |
_mutations = mutations; | |
} | |
private static JsonNode safeReadTree(JsonMapper mapper, String input) { | |
try { | |
return mapper.readTree(input); | |
} catch (IOException ex) { | |
return mapper.createObjectNode(); | |
} | |
} | |
private static JsonNode safeReadTree(String input) { | |
return safeReadTree(MAPPER, input); | |
} | |
private static ObjectNode safeReadAsObject(String input) { | |
return (ObjectNode) safeReadTree(input); | |
} | |
public static Builder builder() { | |
return new Builder(); | |
} | |
public static class Builder { | |
private final List<NodeMutationFn> _mutations; | |
private Builder() { | |
_mutations = new ArrayList<>(); | |
} | |
private static void validateIsRootLevelPtr(JsonPointer ptr) { | |
if (!ptr.equals(TERMINAL_NODE) && !ptr.tail().equals(TERMINAL_NODE)) { | |
throw new IllegalArgumentException( | |
String.format( | |
"Target fields must be at the root level. Offending JsonPointer: %s", ptr)); | |
} | |
} | |
public Builder move(String source, String target) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
// validateIsRootLevelPtr(targetPtr); | |
_mutations.add(ObjectNodeMutation.move(JsonPointer.compile(source), targetPtr)); | |
return this; | |
} | |
public Builder copy(String source, String target) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
// validateIsRootLevelPtr(targetPtr); | |
_mutations.add(ObjectNodeMutation.copy(JsonPointer.compile(source), targetPtr)); | |
return this; | |
} | |
public Builder remove(String source) { | |
JsonPointer sourcePtr = JsonPointer.compile(source); | |
_mutations.add(ObjectNodeMutation.remove(sourcePtr)); | |
return this; | |
} | |
public Builder add(String target, String value) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
_mutations.add(ObjectNodeMutation.add(targetPtr, value)); | |
return this; | |
} | |
public Builder add(String target, long value) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
_mutations.add(ObjectNodeMutation.add(targetPtr, value)); | |
return this; | |
} | |
public Builder add(String target, int value) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
_mutations.add(ObjectNodeMutation.add(targetPtr, value)); | |
return this; | |
} | |
public Builder add(String target, double value) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
_mutations.add(ObjectNodeMutation.add(targetPtr, value)); | |
return this; | |
} | |
public Builder add(String target, boolean value) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
_mutations.add(ObjectNodeMutation.add(targetPtr, value)); | |
return this; | |
} | |
public Builder addToArray(String target, String value) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
_mutations.add(ObjectNodeMutation.addToArray(targetPtr, value)); | |
return this; | |
} | |
public Builder addToArray(String target, long value) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
_mutations.add(ObjectNodeMutation.addToArray(targetPtr, value)); | |
return this; | |
} | |
public Builder addToArray(String target, int value) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
_mutations.add(ObjectNodeMutation.addToArray(targetPtr, value)); | |
return this; | |
} | |
public Builder addToArray(String target, double value) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
_mutations.add(ObjectNodeMutation.addToArray(targetPtr, value)); | |
return this; | |
} | |
public Builder addToArray(String target, boolean value) { | |
JsonPointer targetPtr = JsonPointer.compile(target); | |
_mutations.add(ObjectNodeMutation.addToArray(targetPtr, value)); | |
return this; | |
} | |
public JsonMutationFn build() { | |
return new JsonMutationFn(_mutations); | |
} | |
} | |
@VisibleForTesting | |
public ObjectNode applyMutations(ObjectNode inputNode) { | |
for (NodeMutationFn fn : _mutations) { | |
fn.apply(inputNode); | |
} | |
return inputNode; | |
} | |
@Override | |
public void processElement(DoFn<String, String>.ProcessContext c) throws Exception { | |
ObjectNode inputNode = safeReadAsObject(c.element()); | |
c.output(applyMutations(inputNode).toString()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static net.javacrumbs.jsonunit.JsonMatchers.jsonEquals; | |
import static org.hamcrest.MatcherAssert.assertThat; | |
import org.junit.Test; | |
public class JsonMutationFnTest { | |
public static String getJsonResourceAsString(String resourceName) | |
throws IOException, URISyntaxException { | |
URI inputJsonPath = this.getClass().getClassLoader().getResource(resourceName).toURI(); | |
return new String(Files.readAllBytes(Paths.get(inputJsonPath)), StandardCharsets.UTF_8); | |
} | |
@Test | |
public void testMoveJsonNodes() throws Exception { | |
String input = getJsonResourceAsString("mutate_json_nodes_input.json"); | |
String output = | |
Utils.safeReadTree(getJsonResourceAsString("mutate_json_nodes_output.json")) | |
.toString(); | |
JsonMutationFn jsonMutations = | |
JsonMutationFn.builder() | |
.copy("/foo", "/foo_copy") | |
.move("/i_am_a_missing_field", "/missing") | |
.move("/i_am/a_missing/nested_field", "/missing_nested") | |
.move("/foo", "/foo_nest/test") | |
.move("/baz/jazz/aardvark", "/preserved_array") | |
.move("/preserved_array/0", "/a_from_aardvark") | |
.addToArray("/preserved_array", 0.159) | |
.add("/added_long", 0L) | |
.add("/added_string", "hi") | |
.add("/added_double", 1.0) | |
.remove("/baz/jazz") | |
.build(); | |
assertThat(jsonMutations.applyMutations(Utils.safeReadAsObject(input)), jsonEquals(output)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"foo": "bar", | |
"baz": { | |
"jazz": { | |
"aardvark": ["a", "b", "c", 4] | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"a_from_aardvark": "a", | |
"added_double": 1.0, | |
"added_long": 0, | |
"added_string": "hi", | |
"baz": {}, | |
"foo_copy": "bar", | |
"foo_nest": { | |
"test": "bar" | |
}, | |
"preserved_array": ["b", "c", 4, 0.159] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.databind.node.ObjectNode; | |
import org.apache.beam.sdk.transforms.SerializableFunction; | |
abstract class NodeMutationFn implements SerializableFunction<ObjectNode, ObjectNode> { | |
public abstract ObjectNode apply(ObjectNode node); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.core.JsonPointer; | |
import com.fasterxml.jackson.databind.JsonNode; | |
import com.fasterxml.jackson.databind.node.ArrayNode; | |
import com.fasterxml.jackson.databind.node.BooleanNode; | |
import com.fasterxml.jackson.databind.node.DoubleNode; | |
import com.fasterxml.jackson.databind.node.IntNode; | |
import com.fasterxml.jackson.databind.node.LongNode; | |
import com.fasterxml.jackson.databind.node.ObjectNode; | |
import com.fasterxml.jackson.databind.node.TextNode; | |
public class ObjectNodeMutation { | |
private static final JsonMapper MAPPER = new JsonMapper(); | |
static JsonNode getParentNode(ObjectNode inputNode, JsonPointer ptr) { | |
return inputNode.at(ptr.head()); | |
} | |
private static String getFieldName(JsonPointer ptr) { | |
return ptr.last().getMatchingProperty(); | |
} | |
private static JsonNode getExistingNode( | |
ObjectNode inputNode, JsonPointer ptr, boolean shouldRemoveNode) { | |
JsonNode parentNode = getParentNode(inputNode, ptr); | |
JsonNode existingValue; | |
if (shouldRemoveNode) { | |
if (parentNode.isArray()) { | |
existingValue = ((ArrayNode) parentNode).remove(ptr.last().getMatchingIndex()); | |
} else { | |
existingValue = ((ObjectNode) parentNode).remove(ptr.last().getMatchingProperty()); | |
} | |
} else { | |
if (parentNode.isArray()) { | |
existingValue = parentNode.get(ptr.last().getMatchingIndex()); | |
} else { | |
existingValue = parentNode.get(ptr.last().getMatchingProperty()); | |
} | |
} | |
return existingValue; | |
} | |
// Simple conditional wrapper for mutations | |
public static NodeMutationFn conditionally( | |
@Nullable SerializableObjectNodePredicate condition, NodeMutationFn fn) { | |
return inputNode -> | |
condition == null || condition.test(inputNode) ? fn.apply(inputNode) : inputNode; | |
} | |
private static NodeMutationFn moveOrCopy( | |
JsonPointer fromField, JsonPointer toField, boolean shouldRemoveNode) { | |
return inputNode -> { | |
if (inputNode.at(fromField).isMissingNode()) { | |
// The field name to move from is not found in input at all. Note that this is distinct | |
// from the case where a NullNode is returned; in such a case, we still set the NullNode | |
// to the translated field. Simply return the input verbatim. | |
return inputNode; | |
} | |
JsonNode target = getParentNode(inputNode, toField); | |
if (target.isMissingNode()) { | |
target = inputNode.withObject(toField.head()); | |
} | |
String fieldName = getFieldName(toField); | |
((ObjectNode) target).set(fieldName, getExistingNode(inputNode, fromField, shouldRemoveNode)); | |
return inputNode; | |
}; | |
} | |
public static NodeMutationFn move(JsonPointer fromField, JsonPointer toField) { | |
return inputNode -> moveOrCopy(fromField, toField, true).apply(inputNode); | |
} | |
public static NodeMutationFn copy(JsonPointer fromField, JsonPointer toField) { | |
return inputNode -> moveOrCopy(fromField, toField, false).apply(inputNode); | |
} | |
public static NodeMutationFn remove(JsonPointer fromField) { | |
return inputNode -> { | |
if (inputNode.at(fromField).isMissingNode()) { | |
return inputNode; | |
} | |
getExistingNode(inputNode, fromField, true); | |
return inputNode; | |
}; | |
} | |
private static NodeMutationFn addToArray(JsonPointer targetField, JsonNode value) { | |
return inputNode -> { | |
JsonNode parent = getExistingNode(inputNode, targetField, false); | |
((ArrayNode) parent).add(value); | |
return inputNode; | |
}; | |
} | |
public static NodeMutationFn addToArray(JsonPointer targetField, String value) { | |
return inputNode -> addToArray(targetField, TextNode.valueOf(value)).apply(inputNode); | |
} | |
public static NodeMutationFn addToArray(JsonPointer targetField, long value) { | |
return inputNode -> addToArray(targetField, LongNode.valueOf(value)).apply(inputNode); | |
} | |
public static NodeMutationFn addToArray(JsonPointer targetField, int value) { | |
return inputNode -> addToArray(targetField, IntNode.valueOf(value)).apply(inputNode); | |
} | |
public static NodeMutationFn addToArray(JsonPointer targetField, double value) { | |
return inputNode -> addToArray(targetField, DoubleNode.valueOf(value)).apply(inputNode); | |
} | |
public static NodeMutationFn addToArray(JsonPointer targetField, boolean value) { | |
return inputNode -> addToArray(targetField, BooleanNode.valueOf(value)).apply(inputNode); | |
} | |
public static NodeMutationFn setNull(JsonPointer targetField) { | |
return inputNode -> { | |
JsonNode parent = getParentNode(inputNode, targetField); | |
((ObjectNode) parent).putNull(targetField.last().getMatchingProperty()); | |
return inputNode; | |
}; | |
} | |
static NodeMutationFn set(JsonPointer targetField, JsonNode value) { | |
return inputNode -> { | |
JsonNode parent = getParentNode(inputNode, targetField); | |
if (parent.isMissingNode()) { | |
set(targetField.head(), MAPPER.createObjectNode()).apply(inputNode); | |
parent = getParentNode(inputNode, targetField); | |
} | |
((ObjectNode) parent).set(targetField.last().getMatchingProperty(), value); | |
return inputNode; | |
}; | |
} | |
public static NodeMutationFn set(JsonPointer targetField, String value) { | |
return inputNode -> set(targetField, TextNode.valueOf(value)).apply(inputNode); | |
} | |
public static NodeMutationFn set(JsonPointer targetField, long value) { | |
return inputNode -> set(targetField, LongNode.valueOf(value)).apply(inputNode); | |
} | |
public static NodeMutationFn set(JsonPointer targetField, int value) { | |
return inputNode -> set(targetField, IntNode.valueOf(value)).apply(inputNode); | |
} | |
public static NodeMutationFn set(JsonPointer targetField, double value) { | |
return inputNode -> set(targetField, DoubleNode.valueOf(value)).apply(inputNode); | |
} | |
public static NodeMutationFn set(JsonPointer targetField, boolean value) { | |
return inputNode -> set(targetField, BooleanNode.valueOf(value)).apply(inputNode); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment