Skip to content

Instantly share code, notes, and snippets.

@sebge2emasphere
Created June 22, 2018 13:24
Show Gist options
  • Save sebge2emasphere/ea4e47ca00d7e86d4dfc61e860b78f65 to your computer and use it in GitHub Desktop.
Save sebge2emasphere/ea4e47ca00d7e86d4dfc61e860b78f65 to your computer and use it in GitHub Desktop.
context
.getEntryJoiner()
.join(
entriesFromFlow(
request.getFlowDefinitionA(),
request.getInjectionFlowA(),
request.getFlowDefinition().getInjectionConfiguration().getFlowAMatching(),
context
),
entriesFromFlow(
request.getFlowDefinitionB(),
request.getInjectionFlowB(),
request.getFlowDefinition().getInjectionConfiguration().getFlowBMatching(),
context
)
)
.mapToPair(
context.getEntryMerger()
)
.filter(Objects::nonNull)
.leftOuterJoin(
existingEntries(context)
)
.map(
context.getCollisionResolver()
)
.filter(Objects::nonNull)
.flatMapToPair(
context.getFlowEntryToHBaseRowConverter()
)
.saveAsNewAPIHadoopFile(
FlowExecutorUtils.TABLE,
ImmutableBytesWritable.class,
Result.class,
TableOutputFormat.class,
context.getConfiguration()
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment