Skip to content

Instantly share code, notes, and snippets.

@yuwtennis
Created July 14, 2024 09:54
Show Gist options
  • Save yuwtennis/9e07894e2bfd692161f7203b2772d7cf to your computer and use it in GitHub Desktop.
Save yuwtennis/9e07894e2bfd692161f7203b2772d7cf to your computer and use it in GitHub Desktop.
Practical example for setting custom options as beamTestPipelineOptions for TestPipeline in apache beam
public interface DagOptions extends PipelineOptions {
@Description("Dag options")
@Default.String("HELLOWORLD")
String getDagType();
void setDagType(String dagType);
}
package org.example.dags;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.example.App;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.example.Utils.asJsonStr;
import static org.junit.Assert.assertTrue;
public class HelloWorldDagTest {
public interface TestHWOptions extends TestPipelineOptions, App.DagOptions {}
private List<String> input;
private String dagType;
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Before
public void setUp() {
// Test pipeline options as JSON array.
// I have used Jackson to safely convert Java object into JSON string
// https://beam.apache.org/releases/javadoc/2.57.0/org/apache/beam/sdk/testing/TestPipeline.html
// https://www.json.org/json-en.html
String[] pOpts = new String[]{
"--runner=DirectRunner"
};
System.setProperty("beamTestPipelineOptions", asJsonStr(pOpts));
input = new ArrayList<String>(Arrays.asList("Hello", "World."));
dagType = "HELLOWORLD";
}
@Test
@Category(NeedsRunner.class)
public void testProcess() {
p
.getOptions()
.as(App.DagOptions.class)
.setDagType(dagType);
p.run();
assertTrue(true);
}
}
package org.example;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
public class Utils {
/***
* Takes String array and convert into JSON string
* @param arr as String array
* @return Json string
*/
public static String asJsonStr(String[] arr) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(arr);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment