Skip to content

Instantly share code, notes, and snippets.

@brachi-wernick
Last active March 6, 2019 13:06
Show Gist options
  • Save brachi-wernick/40a0eb366f529b7b7df0f2522284b14e to your computer and use it in GitHub Desktop.
Save brachi-wernick/40a0eb366f529b7b7df0f2522284b14e to your computer and use it in GitHub Desktop.
EventToTableRowTransformerFn
public class EventToTableRowTransformerFn extends DoFn<JsonNode, TableRow> {
private FieldsConfigurationProvider fieldsConfigurationProvider;
public EventToTableRowTransformerFn(String filedConfigurationBucketName, String filedConfigurationFilePath, Long filedConfigurationCheckInterval) {
this.filedConfigurationBucketName = filedConfigurationBucketName;
this.filedConfigurationFilePath = filedConfigurationFilePath;
this.filedConfigurationCheckInterval = filedConfigurationCheckInterval;
}
@Setup
public void setUp() {
FieldsConfigurationProvider.init(filedConfigurationBucketName, filedConfigurationCheckInterval, filedConfigurationFilePath);
fieldsConfigurationProvider = FieldsConfigurationProvider.getInstance();
}
@ProcessElement
public void processElement(@Element JsonNode element, OutputReceiver<TableRow> out, ProcessContext context) {
TableRow convertedRow = new TableRow();
FieldsConfiguration config = fieldsConfigurationProvider.getConfig();
if(config==null){
throw new RuntimeException("empty config, check configuration file in "+ FieldsConfigurationProvider.bucketName +" file"+ FieldsConfigurationProvider.filePath);
}
for (FieldConfig fieldConfig : config.getFields()) {
JsonNode extracted = element.get(fieldConfig.getJsonpath());
switch (fieldConfig.getType()) {
case STRING:
transformer = v -> v;
break;
case FLOAT64:
transformer = Float::parseFloat;
break;
case INT64:
transformer = Long::parseLong;
break;
case TIMESTAMP:
transformer = new DateTransformer();
break;
case BOOL:
transformer = Boolean::valueOf;
break;
}
insertValue(FieldConfig, fieldConfig.getColumnname(), convertedRow, transformer);
context.output(convertedRow);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment