This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<workflow-app name="Variable list of sequential jobs" xmlns="uri:oozie:workflow:0.2"> | |
<start to="java_A"/> | |
<action name="java_A"> | |
<java> | |
<job-tracker>${jobTracker}</job-tracker> | |
<name-node>${nameNode}</name-node> | |
<main-class>com.job.A</main-class> | |
</java> | |
<ok to="java_B"/> | |
<error to="kill"/> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
test("Test generate sequential jobs") { | |
val xml = RunWorkflow | |
.getXMLString(new RunSequentialJobsWorkflow() | |
.createPipeline("com.job.A,com.job.B,com.job.C")) // naive way to interpolate ${JobsList} | |
// there are over 9999 elegant ways to do it in Scala | |
println(xml) | |
xml.length should be > 0 | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void testRunAndDumpMetricsToConsole() throws Exception{ | |
List<Users> users = readUsers(); | |
reporter.start(10, TimeUnit.SECONDS); | |
IntStream.range(1, 20).forEach(i-> { | |
for(User user : users){ | |
for(Trigger trigger: triggers){ | |
if(System.getProperty("play", "notSet").equals("play")) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
one_day_ago = datetime.combine(datetime.today() - timedelta(1), datetime.min.time()) | |
default_args = { | |
'owner': 'airflow', | |
'start_date': one_day_ago | |
} | |
dag = DAG(dag_id='my_pipeline', default_args=default_args) | |
generate_build_id = BuildIdOperator(dag=dag) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ScriptFactory{ | |
private final Map<String, Script> cache = new HashMap<>() | |
private static final GroovyClassLoader CLASS_LOADER = new GroovyClassLoader() | |
private String template | |
/** | |
@param source is a dynamic expression to be evaluated: user.attr1 == 'x' || user.attr2 == 'y' | |
@return instance of script | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyBaseOperator(BaseOperator): | |
TASK_ID = 'BUILD_ID_TASK' | |
PIPELINE_BUILD_ID_KEY = 'BUILD_ID' | |
@apply_defaults | |
def __init__(self, *args, **kwargs): | |
super(MyBaseOperator, self).__init__(*args, **kwargs) | |
def execute(self, context): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
lass UntarOperator(BashOperator): | |
""" | |
Download and unpack artifact | |
:param artifact_tar_gz_name: name of artifact from previous download step | |
:type url: string | |
:param lookup_task_id an id of task that downloaded artifact | |
:type user: string | |
:param password |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A builder with single spout and bolt. Spout could emit tuples using two streams: | |
// user event with fieldGouping with streamId UserEvent.class.getSimpleName() | |
// game event with streamId GameEvent.class.getSimpleName() | |
builder.setSpout(GenericEventSpout.class.getSimpleName(), new GenericEventSpout(), 2) | |
builder.setBolt(UserContextBolt.class.getSimpleName(), new UserContextBolt(), 2) | |
.addConfigurations(env) | |
.fieldsGrouping(GenericEventSpout.class.getSimpleName(), UserEvent.class.getSimpleName(), new Fields(Const.userId.name())) | |
.allGrouping(GenericEventSpout.class.getSimpleName(), GameEvent.class.getSimpleName()); | |
// Here are stream declarations in Spout: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def groupAndGenerateNewSurrogateKey: Pipe = { | |
pipe.groupBy('naturalKey){ group => | |
group.mapStream[Long, (Long, Long)]('someValueField -> ('someValueField, 'newSurrogateKey)) { items: Iterator[Long] => | |
val newSurrogateKey = KeyGen.generate() | |
println(s"new group key:[$newSurrogateKey]") //outputs generated key | |
println(s"items: ${items.toList}") //correctly outputs grouped items | |
items.map((_,newSurrogateKey)).toList | |
} | |
}.project('someValueField, 'newSurrogateKey) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Typical put implmentation | |
public boolean put(SomeBean bean){ | |
HTableInterface hTable = null; | |
try { | |
//Controller holds reference to HConnection. | |
//It's created once during servlet.init in servlet and shared across several controllers, just an object ref | |
HConnection hConnection = getConnection(); | |
hTable = hConnection.getTable(getName()); | |
return hTable.checkAndPut(createKey(bean), CF_B, CQ_B, null, createPut(bean)); | |
} |
NewerOlder