Created
July 14, 2024 09:54
-
-
Save yuwtennis/9e07894e2bfd692161f7203b2772d7cf to your computer and use it in GitHub Desktop.
Practical example for setting custom options as beamTestPipelineOptions for TestPipeline in apache beam
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface DagOptions extends PipelineOptions { | |
@Description("Dag options") | |
@Default.String("HELLOWORLD") | |
String getDagType(); | |
void setDagType(String dagType); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example.dags; | |
import org.apache.beam.sdk.testing.NeedsRunner; | |
import org.apache.beam.sdk.testing.TestPipeline; | |
import org.apache.beam.sdk.testing.TestPipelineOptions; | |
import org.example.App; | |
import org.junit.Before; | |
import org.junit.Rule; | |
import org.junit.Test; | |
import org.junit.experimental.categories.Category; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import static org.example.Utils.asJsonStr; | |
import static org.junit.Assert.assertTrue; | |
public class HelloWorldDagTest { | |
public interface TestHWOptions extends TestPipelineOptions, App.DagOptions {} | |
private List<String> input; | |
private String dagType; | |
@Rule | |
public final transient TestPipeline p = TestPipeline.create(); | |
@Before | |
public void setUp() { | |
// Test pipeline options as JSON array. | |
// I have used Jackson to safely convert Java object into JSON string | |
// https://beam.apache.org/releases/javadoc/2.57.0/org/apache/beam/sdk/testing/TestPipeline.html | |
// https://www.json.org/json-en.html | |
String[] pOpts = new String[]{ | |
"--runner=DirectRunner" | |
}; | |
System.setProperty("beamTestPipelineOptions", asJsonStr(pOpts)); | |
input = new ArrayList<String>(Arrays.asList("Hello", "World.")); | |
dagType = "HELLOWORLD"; | |
} | |
@Test | |
@Category(NeedsRunner.class) | |
public void testProcess() { | |
p | |
.getOptions() | |
.as(App.DagOptions.class) | |
.setDagType(dagType); | |
p.run(); | |
assertTrue(true); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example; | |
import com.fasterxml.jackson.core.JsonProcessingException; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import java.util.ArrayList; | |
public class Utils { | |
/*** | |
* Takes String array and convert into JSON string | |
* @param arr as String array | |
* @return Json string | |
*/ | |
public static String asJsonStr(String[] arr) { | |
ObjectMapper objectMapper = new ObjectMapper(); | |
try { | |
return objectMapper.writeValueAsString(arr); | |
} catch (JsonProcessingException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment