Last active
March 6, 2019 13:06
-
-
Save brachi-wernick/40a0eb366f529b7b7df0f2522284b14e to your computer and use it in GitHub Desktop.
EventToTableRowTransformerFn
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class EventToTableRowTransformerFn extends DoFn<JsonNode, TableRow> { | |
private FieldsConfigurationProvider fieldsConfigurationProvider; | |
public EventToTableRowTransformerFn(String filedConfigurationBucketName, String filedConfigurationFilePath, Long filedConfigurationCheckInterval) { | |
this.filedConfigurationBucketName = filedConfigurationBucketName; | |
this.filedConfigurationFilePath = filedConfigurationFilePath; | |
this.filedConfigurationCheckInterval = filedConfigurationCheckInterval; | |
} | |
@Setup | |
public void setUp() { | |
FieldsConfigurationProvider.init(filedConfigurationBucketName, filedConfigurationCheckInterval, filedConfigurationFilePath); | |
fieldsConfigurationProvider = FieldsConfigurationProvider.getInstance(); | |
} | |
@ProcessElement | |
public void processElement(@Element JsonNode element, OutputReceiver<TableRow> out, ProcessContext context) { | |
TableRow convertedRow = new TableRow(); | |
FieldsConfiguration config = fieldsConfigurationProvider.getConfig(); | |
if(config==null){ | |
throw new RuntimeException("empty config, check configuration file in "+ FieldsConfigurationProvider.bucketName +" file"+ FieldsConfigurationProvider.filePath); | |
} | |
for (FieldConfig fieldConfig : config.getFields()) { | |
JsonNode extracted = element.get(fieldConfig.getJsonpath()); | |
switch (fieldConfig.getType()) { | |
case STRING: | |
transformer = v -> v; | |
break; | |
case FLOAT64: | |
transformer = Float::parseFloat; | |
break; | |
case INT64: | |
transformer = Long::parseLong; | |
break; | |
case TIMESTAMP: | |
transformer = new DateTransformer(); | |
break; | |
case BOOL: | |
transformer = Boolean::valueOf; | |
break; | |
} | |
insertValue(FieldConfig, fieldConfig.getColumnname(), convertedRow, transformer); | |
context.output(convertedRow); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment