Last active
June 15, 2020 17:34
-
-
Save roger-link/3a6c8c6c51181c00932dcee8c86b8ae2 to your computer and use it in GitHub Desktop.
Beam-Pipeline.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.redacted.redacted.processing.beam.errorrate; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.io.kafka.KafkaRecord; | |
import org.apache.beam.sdk.coders.SerializableCoder; | |
import org.apache.beam.sdk.extensions.jackson.ParseJsons; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import org.apache.beam.sdk.transforms.Combine; | |
import org.apache.beam.sdk.transforms.SerializableFunction; | |
import org.apache.beam.sdk.transforms.WithKeys; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.transforms.windowing.FixedWindows; | |
import org.apache.beam.sdk.transforms.windowing.Window; | |
import org.apache.beam.sdk.transforms.MapElements; | |
import org.apache.beam.sdk.io.aws.options.AwsOptions; | |
import org.apache.beam.sdk.transforms.SimpleFunction; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.joda.time.Duration; | |
import com.redacted.redacted.processing.beam.common.newrelic.NewRelicRecord; | |
import com.redacted.redacted.processing.beam.common.options.OptionsParser; | |
import com.redacted.redacted.processing.beam.common.kinesis.WriteKinesisRecords; | |
import com.redacted.redacted.processing.beam.common.kinesis.KinesisClientsProvider; | |
import com.redacted.redacted.processing.beam.common.kafka.Kafka; | |
import com.redacted.redacted.processing.beam.common.coders.Coders; | |
/** Main streaming class. */ | |
public final class ErrorRate { | |
/** Main streaming class. */ | |
private ErrorRate() { }; | |
/** Logger class. */ | |
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorRate.class); | |
/** main. | |
* @param args contains arguments | |
* @throws Exception if a problem occurs. | |
*/ | |
public static void main(final String[] args) throws Exception { | |
LOGGER.info("redacted-Logger: Entering main"); | |
@SuppressWarnings("checkstyle:linelength") | |
final ErrorRatePipelineOptions options = new OptionsParser<ErrorRatePipelineOptions>().getOptions(args, ErrorRatePipelineOptions.class); | |
LOGGER.info("redacted-Logger: Entering runErrorRate and running with {}", options); | |
runErrorRate(options); | |
} | |
/** runErrorrate proceses kafka stream and calculates error rate. @param options contains options. */ | |
private static void runErrorRate(final ErrorRatePipelineOptions options) { | |
LOGGER.info("redacted-Logger: Options are: {}", options); | |
//need to figure out how to get rid of these and use kafkaOptions | |
// but I'm getting weird error | |
String brokers = Config.KAFKA_BROKERS; | |
String topic = Config.KAFKA_TOPIC; | |
Pipeline pipeline = Pipeline.create(options); | |
LOGGER.info("redacted-Logger: Retrieving New Relic records."); | |
PCollection<NewRelicRecord> newRelicRecords = pipeline.apply(Kafka.readKafkaTopic(brokers, topic)) | |
.apply( | |
ParDo.of( | |
new DoFn<KafkaRecord<Long, String>, String>() { | |
@ProcessElement | |
public void processElement(final ProcessContext processContext) { | |
KafkaRecord<Long, String> record = processContext.element(); | |
LOGGER.info("redacted-Logger: Found Kafka value of {}", record.getKV().getValue()); | |
processContext.output(record.getKV().getValue()); | |
} | |
} | |
) | |
) | |
.apply(ParseJsons.of(NewRelicRecord.class)).setCoder(SerializableCoder.of(NewRelicRecord.class)); | |
// adds a key of application name to New Relic events. this is needed to keyby application name later | |
// https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys | |
PCollection<KV<String, NewRelicRecord>> keyedNewRelicRecords = | |
newRelicRecords.apply( | |
Window.<NewRelicRecord>into(FixedWindows.of(Duration.standardSeconds(options.getFlinkWindow()))) | |
.discardingFiredPanes() | |
.withAllowedLateness(Duration.standardSeconds(options.getFlinkAllowedLateness()))) | |
.apply( | |
WithKeys.of(new SerializableFunction<NewRelicRecord, String>() { | |
public String apply(final NewRelicRecord record) { | |
return record.getAppname(); | |
} | |
})); | |
// perform aggregations on new relic records | |
PCollection<KV<String, ErrorRateResult>> aggregatedRecords = | |
keyedNewRelicRecords.apply( | |
Combine.<String, NewRelicRecord, ErrorRateResult>perKey( | |
new NewRelicRecordAggregatorFn())); | |
// encode kv of <string,errorrateresult> to byte array | |
PCollection<byte[]> encodedRecords = | |
aggregatedRecords.apply("Map to byte array for KinesisStream", MapElements.via(new MapKVToByteArray())); | |
encodedRecords.apply("WriteToKinesis", | |
new WriteKinesisRecords(new KinesisClientsProvider(options.as(AwsOptions.class)))); | |
pipeline.run().waitUntilFinish(); | |
} | |
/** class to encode flinks value for writing to kinesis. */ | |
private static class MapKVToByteArray extends SimpleFunction<KV<String, ErrorRateResult>, byte[]> { | |
@Override | |
public byte[] apply(final KV<String, ErrorRateResult> input) { | |
return Coders.encodeInputElement(input.getValue()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment