Created
October 7, 2019 19:23
-
-
Save gxercavins/cf163038ce432d21b20920aa419e1123 to your computer and use it in GitHub Desktop.
SO question 58268236
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.beam.examples; | |
import org.apache.beam.sdk.coders.AvroCoder; | |
import org.apache.beam.sdk.coders.DefaultCoder; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.transforms.Create; | |
import org.apache.beam.sdk.transforms.Combine; | |
import org.apache.beam.sdk.transforms.Combine.CombineFn; | |
import org.apache.beam.sdk.values.KV; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class CombineFnLogger { | |
public static class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> { | |
private static final Logger LOG = LoggerFactory.getLogger(AverageFn.class); | |
@DefaultCoder(AvroCoder.class) | |
public static class Accum { | |
int sum = 0; | |
int count = 0; | |
} | |
@Override | |
public Accum createAccumulator() { return new Accum(); } | |
@Override | |
public Accum addInput(Accum accum, Integer input) { | |
accum.sum += input; | |
accum.count++; | |
return accum; | |
} | |
@Override | |
public Accum mergeAccumulators(Iterable<Accum> accums) { | |
Accum merged = createAccumulator(); | |
for (Accum accum : accums) { | |
merged.sum += accum.sum; | |
merged.count += accum.count; | |
} | |
return merged; | |
} | |
@Override | |
public Double extractOutput(Accum accum) { | |
Double result = (double) accum.sum / accum.count; | |
LOG.info(String.format(">>> Logs >>> Result is %.1f", result)); | |
return result; | |
} | |
} | |
@SuppressWarnings("serial") | |
public static void main(String[] args) { | |
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); | |
Pipeline p = Pipeline.create(options); | |
p | |
.apply("Create Data", Create.of(KV.of("key", 1), | |
KV.of("key", 4), | |
KV.of("key", 10))) | |
.apply("Combine per Key", Combine.<String, Integer, Double>perKey(new AverageFn())); | |
p.run(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oct 07, 2019 9:18:58 PM org.apache.beam.examples.CombineFnLogger$AverageFn extractOutput | |
INFO: >>> Logs >>> Result is 5.0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
if [ "$#" -ne 2 ]; then | |
echo "Please specify the Project ID and GCS Bucket Name" | |
echo "Usage: ./run.sh project-id bucket-name" | |
exit | |
fi | |
PROJECT=$1 | |
BUCKET=$2 | |
MAIN_CLASS=CombineFnLogger | |
export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH | |
mvn -Pdataflow-runner compile -e exec:java \ | |
-Dexec.mainClass=org.apache.beam.examples.$MAIN_CLASS \ | |
-Dexec.args="--project=$PROJECT \ | |
--tempLocation=gs://$BUCKET/temp/ \ | |
--stagingLocation=gs://$PROJECT/staging/ \ | |
--runner=DataflowRunner" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<!-- | |
Licensed to the Apache Software Foundation (ASF) under one or more | |
contributor license agreements. See the NOTICE file distributed with | |
this work for additional information regarding copyright ownership. | |
The ASF licenses this file to You under the Apache License, Version 2.0 | |
(the "License"); you may not use this file except in compliance with | |
the License. You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
--> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>org.example</groupId> | |
<artifactId>combinefn-logger</artifactId> | |
<version>0.1</version> | |
<packaging>jar</packaging> | |
<properties> | |
<beam.version>2.15.0</beam.version> | |
<bigquery.version>v2-rev20181104-1.27.0</bigquery.version> | |
<google-clients.version>1.27.0</google-clients.version> | |
<guava.version>20.0</guava.version> | |
<hamcrest.version>2.1</hamcrest.version> | |
<jackson.version>2.9.8</jackson.version> | |
<joda.version>2.10.1</joda.version> | |
<junit.version>4.13-beta-1</junit.version> | |
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> | |
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version> | |
<maven-jar-plugin.version>3.0.2</maven-jar-plugin.version> | |
<maven-shade-plugin.version>3.1.0</maven-shade-plugin.version> | |
<mockito.version>1.10.19</mockito.version> | |
<pubsub.version>v1-rev20181105-1.27.0</pubsub.version> | |
<slf4j.version>1.7.25</slf4j.version> | |
<spark.version>2.4.2</spark.version> | |
<hadoop.version>2.7.3</hadoop.version> | |
<maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version> | |
<nemo.version>0.1</nemo.version> | |
</properties> | |
<repositories> | |
<repository> | |
<id>apache.snapshots</id> | |
<name>Apache Development Snapshot Repository</name> | |
<url>https://repository.apache.org/content/repositories/snapshots/</url> | |
<releases> | |
<enabled>false</enabled> | |
</releases> | |
<snapshots> | |
<enabled>true</enabled> | |
</snapshots> | |
</repository> | |
</repositories> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>${maven-compiler-plugin.version}</version> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
</configuration> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-surefire-plugin</artifactId> | |
<version>${maven-surefire-plugin.version}</version> | |
<configuration> | |
<parallel>all</parallel> | |
<threadCount>4</threadCount> | |
<redirectTestOutputToFile>true</redirectTestOutputToFile> | |
</configuration> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.maven.surefire</groupId> | |
<artifactId>surefire-junit47</artifactId> | |
<version>${maven-surefire-plugin.version}</version> | |
</dependency> | |
</dependencies> | |
</plugin> | |
<!-- Ensure that the Maven jar plugin runs before the Maven | |
shade plugin by listing the plugin higher within the file. --> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-jar-plugin</artifactId> | |
<version>${maven-jar-plugin.version}</version> | |
</plugin> | |
<!-- | |
Configures `mvn package` to produce a bundled jar ("fat jar") for runners | |
that require this for job submission to a cluster. | |
--> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-shade-plugin</artifactId> | |
<version>${maven-shade-plugin.version}</version> | |
<executions> | |
<execution> | |
<phase>package</phase> | |
<goals> | |
<goal>shade</goal> | |
</goals> | |
<configuration> | |
<finalName>${project.artifactId}-bundled-${project.version}</finalName> | |
<filters> | |
<filter> | |
<artifact>*:*</artifact> | |
<excludes> | |
<exclude>META-INF/LICENSE</exclude> | |
<exclude>META-INF/*.SF</exclude> | |
<exclude>META-INF/*.DSA</exclude> | |
<exclude>META-INF/*.RSA</exclude> | |
</excludes> | |
</filter> | |
</filters> | |
<transformers> | |
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> | |
</transformers> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
<pluginManagement> | |
<plugins> | |
<plugin> | |
<groupId>org.codehaus.mojo</groupId> | |
<artifactId>exec-maven-plugin</artifactId> | |
<version>${maven-exec-plugin.version}</version> | |
<configuration> | |
<cleanupDaemonThreads>false</cleanupDaemonThreads> | |
</configuration> | |
</plugin> | |
</plugins> | |
</pluginManagement> | |
</build> | |
<profiles> | |
<profile> | |
<id>direct-runner</id> | |
<activation> | |
<activeByDefault>true</activeByDefault> | |
</activation> | |
<!-- Makes the DirectRunner available when running a pipeline. --> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-runners-direct-java</artifactId> | |
<version>${beam.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
</dependencies> | |
</profile> | |
<profile> | |
<id>portable-runner</id> | |
<activation> | |
<activeByDefault>true</activeByDefault> | |
</activation> | |
<!-- Makes the PortableRunner available when running a pipeline. --> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-runners-reference-java</artifactId> | |
<version>${beam.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
</dependencies> | |
</profile> | |
<profile> | |
<id>apex-runner</id> | |
<!-- Makes the ApexRunner available when running a pipeline. --> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-runners-apex</artifactId> | |
<version>${beam.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
<!-- | |
Apex depends on httpclient version 4.3.6, project has a transitive dependency to httpclient 4.0.1 from | |
google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This | |
can be removed when the project no longer has a dependency on a different httpclient version. | |
--> | |
<dependency> | |
<groupId>org.apache.httpcomponents</groupId> | |
<artifactId>httpclient</artifactId> | |
<version>4.3.6</version> | |
<scope>runtime</scope> | |
<exclusions> | |
<exclusion> | |
<groupId>commons-codec</groupId> | |
<artifactId>commons-codec</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<!-- | |
Apex 3.6 is built against YARN 2.6. Version in the fat jar has to match | |
what's on the cluster, hence we need to repeat the Apex Hadoop dependencies here. | |
--> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-yarn-client</artifactId> | |
<version>${hadoop.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-common</artifactId> | |
<version>${hadoop.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
</dependencies> | |
</profile> | |
<profile> | |
<id>dataflow-runner</id> | |
<!-- Makes the DataflowRunner available when running a pipeline. --> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId> | |
<version>${beam.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
</dependencies> | |
</profile> | |
<profile> | |
<id>flink-runner</id> | |
<!-- Makes the FlinkRunner available when running a pipeline. --> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-runners-flink_2.11</artifactId> | |
<version>${beam.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
</dependencies> | |
</profile> | |
<profile> | |
<id>spark-runner</id> | |
<!-- Makes the SparkRunner available when running a pipeline. Additionally, | |
overrides some Spark dependencies to Beam-compatible versions. --> | |
<properties> | |
<netty.version>4.1.17.Final</netty.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-runners-spark</artifactId> | |
<version>${beam.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-sdks-java-io-hadoop-file-system</artifactId> | |
<version>${beam.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-streaming_2.11</artifactId> | |
<version>${spark.version}</version> | |
<scope>runtime</scope> | |
<exclusions> | |
<exclusion> | |
<groupId>org.slf4j</groupId> | |
<artifactId>jul-to-slf4j</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>com.fasterxml.jackson.module</groupId> | |
<artifactId>jackson-module-scala_2.11</artifactId> | |
<version>${jackson.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
<!-- [BEAM-3519] GCP IO exposes netty on its API surface, causing conflicts with runners --> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> | |
<version>${beam.version}</version> | |
<exclusions> | |
<exclusion> | |
<groupId>io.grpc</groupId> | |
<artifactId>grpc-netty</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>io.netty</groupId> | |
<artifactId>netty-handler</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
</dependencies> | |
</profile> | |
<profile> | |
<id>gearpump-runner</id> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-runners-gearpump</artifactId> | |
<version>${beam.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
</dependencies> | |
</profile> | |
<profile> | |
<id>samza-runner</id> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-runners-samza</artifactId> | |
<version>${beam.version}</version> | |
<scope>runtime</scope> | |
</dependency> | |
</dependencies> | |
</profile> | |
<profile> | |
<id>nemo-runner</id> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.nemo</groupId> | |
<artifactId>nemo-compiler-frontend-beam</artifactId> | |
<version>${nemo.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-common</artifactId> | |
<version>${hadoop.version}</version> | |
<exclusions> | |
<exclusion> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-api</artifactId> | |
</exclusion> | |
<exclusion> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-log4j12</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
</dependencies> | |
</profile> | |
</profiles> | |
<dependencies> | |
<!-- Adds a dependency on the Beam SDK. --> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-sdks-java-core</artifactId> | |
<version>${beam.version}</version> | |
</dependency> | |
<!-- Adds a dependency on the Beam Google Cloud Platform IO module. --> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> | |
<version>${beam.version}</version> | |
</dependency> | |
<!-- Dependencies below this line are specific dependencies needed by the examples code. --> | |
<dependency> | |
<groupId>com.google.api-client</groupId> | |
<artifactId>google-api-client</artifactId> | |
<version>${google-clients.version}</version> | |
<exclusions> | |
<!-- Exclude an old version of guava that is being pulled | |
in by a transitive dependency of google-api-client --> | |
<exclusion> | |
<groupId>com.google.guava</groupId> | |
<artifactId>guava-jdk5</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>com.google.apis</groupId> | |
<artifactId>google-api-services-bigquery</artifactId> | |
<version>${bigquery.version}</version> | |
<exclusions> | |
<!-- Exclude an old version of guava that is being pulled | |
in by a transitive dependency of google-api-client --> | |
<exclusion> | |
<groupId>com.google.guava</groupId> | |
<artifactId>guava-jdk5</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>com.google.http-client</groupId> | |
<artifactId>google-http-client</artifactId> | |
<version>${google-clients.version}</version> | |
<exclusions> | |
<!-- Exclude an old version of guava that is being pulled | |
in by a transitive dependency of google-api-client --> | |
<exclusion> | |
<groupId>com.google.guava</groupId> | |
<artifactId>guava-jdk5</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>com.google.apis</groupId> | |
<artifactId>google-api-services-pubsub</artifactId> | |
<version>${pubsub.version}</version> | |
<exclusions> | |
<!-- Exclude an old version of guava that is being pulled | |
in by a transitive dependency of google-api-client --> | |
<exclusion> | |
<groupId>com.google.guava</groupId> | |
<artifactId>guava-jdk5</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>joda-time</groupId> | |
<artifactId>joda-time</artifactId> | |
<version>${joda.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>com.google.guava</groupId> | |
<artifactId>guava</artifactId> | |
<version>${guava.version}</version> | |
</dependency> | |
<!-- Add slf4j API frontend binding with JUL backend --> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-api</artifactId> | |
<version>${slf4j.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-jdk14</artifactId> | |
<version>${slf4j.version}</version> | |
<!-- When loaded at runtime this will wire up slf4j to the JUL backend --> | |
<scope>runtime</scope> | |
</dependency> | |
<!-- Hamcrest and JUnit are required dependencies of PAssert, | |
which is used in the main code of DebuggingWordCount example. --> | |
<dependency> | |
<groupId>org.hamcrest</groupId> | |
<artifactId>hamcrest-core</artifactId> | |
<version>${hamcrest.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.hamcrest</groupId> | |
<artifactId>hamcrest-library</artifactId> | |
<version>${hamcrest.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>junit</groupId> | |
<artifactId>junit</artifactId> | |
<version>${junit.version}</version> | |
</dependency> | |
<!-- The DirectRunner is needed for unit tests. --> | |
<dependency> | |
<groupId>org.apache.beam</groupId> | |
<artifactId>beam-runners-direct-java</artifactId> | |
<version>${beam.version}</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.mockito</groupId> | |
<artifactId>mockito-core</artifactId> | |
<version>${mockito.version}</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Log not visible in Cloud Console:

Log visible in StackDriver Logging: