Skip to content

Instantly share code, notes, and snippets.

class MyBaseOperator(BaseOperator):
TASK_ID = 'BUILD_ID_TASK'
PIPELINE_BUILD_ID_KEY = 'BUILD_ID'
@apply_defaults
def __init__(self, *args, **kwargs):
super(MyBaseOperator, self).__init__(*args, **kwargs)
def execute(self, context):
public class ScriptFactory{
private final Map<String, Script> cache = new HashMap<>()
private static final GroovyClassLoader CLASS_LOADER = new GroovyClassLoader()
private String template
/**
@param source is a dynamic expression to be evaluated: user.attr1 == 'x' || user.attr2 == 'y'
@return instance of script
*/
one_day_ago = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'start_date': one_day_ago
}
dag = DAG(dag_id='my_pipeline', default_args=default_args)
generate_build_id = BuildIdOperator(dag=dag)
@Test
public void testRunAndDumpMetricsToConsole() throws Exception{
List<Users> users = readUsers();
reporter.start(10, TimeUnit.SECONDS);
IntStream.range(1, 20).forEach(i-> {
for(User user : users){
for(Trigger trigger: triggers){
if(System.getProperty("play", "notSet").equals("play")) {
test("Test generate sequential jobs") {
val xml = RunWorkflow
.getXMLString(new RunSequentialJobsWorkflow()
.createPipeline("com.job.A,com.job.B,com.job.C")) // naive way to interpolate ${JobsList}
// there are over 9999 elegant ways to do it in Scala
println(xml)
xml.length should be > 0
}
<workflow-app name="Variable list of sequential jobs" xmlns="uri:oozie:workflow:0.2">
<start to="java_A"/>
<action name="java_A">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.job.A</main-class>
</java>
<ok to="java_B"/>
<error to="kill"/>