Created
August 11, 2023 13:10
-
-
Save annibal/e01d1b06d074dfdd14b8f9bca0b46e5a to your computer and use it in GitHub Desktop.
JS Airflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Main "Lib" function | |
/** | |
* Executes a flow of callbacks passing inputs and storing outputs | |
* Example: jsAirflow( | |
* [ | |
* { fn: 'sum', in: ['v1', 'v2'], out: ['result'] }, | |
* { fn: 'power', in: ['result', 'power'], out: ['result'] }, | |
* ], | |
* obj, | |
* { v1: 1.7, v2: 1.3, power: 2 } | |
* ) | |
* | |
* Starts with v1=1.7, v2=1.3, power=2; | |
* | |
* Calls sum() passing the params set in "in", in order, so calls sum(v1, v2): | |
* sum(1.7, 1.3); // returns { result: 3 }; | |
* | |
* Stores result=3 in internal data; | |
* Calls power(data.result, data.power) = power(3, 2); returns // { result: 9 } | |
* | |
* returns object containing the result, which in this case will be | |
* { | |
* v1: 1.7, | |
* v2: 1.3, | |
* power: 2, | |
* result: 9, | |
* } | |
* | |
* @param flow Array of flowObject={ fn: string, in: string[], out: string[] } or { fns: flowObject[] }; | |
* @param callbacks Object with the actual functions flowObject.fn will execute. Functions have to return a named object, with keys matching the defined out | |
* @param initialData Start with this object | |
* @returns Object with all the parsed out's | |
*/ | |
async function jsAirflow(flow, callbacks, initialData, log = true) { | |
// Lib Helper | |
function getInputsArray(inputKeys, dataStore) { | |
const arrInputs = []; | |
inputKeys.forEach((in_key) => arrInputs.push(dataStore[in_key]) ); | |
return arrInputs | |
} | |
// Lib Helper | |
function addOutputsToDataStore(currentDataStore, outputNames, resultObj) { | |
const newDataStore = { ...currentDataStore } | |
outputNames.forEach((out_key) => { | |
newDataStore[out_key] = resultObj[out_key]; | |
}); | |
return newDataStore; | |
} | |
function maybeLog = function() { if (log) console.log(...arguments); } | |
// Initial Values | |
let dataStore = { ...initialData }; | |
for (let i = 0; i < flow.length; i += 1) { | |
const fnName = flow[i].fn; | |
if (flow[i].fns) { | |
const parallelFns = []; | |
flow[i].fns.forEach((subFlow, subFlowIdx) => parallelFns.push( | |
new Promise(async (resolve) => { | |
const subFnInput = getInputsArray(subFlow.in, dataStore); | |
const subFnName = subFlow.fn; | |
maybeLog(` #${i}.${subFlowIdx}: Calling sub ${subFnName}(${subFlow.in.join(',')})`) | |
maybeLog(subFnInput); | |
const result = await callbacks[subFnName](...subFnInput); | |
maybeLog(` ${subFnName}() result: `, result); | |
dataStore = addOutputsToDataStore(dataStore, subFlow.out, result); | |
resolve(result); | |
}) | |
)); | |
maybeLog(parallelFns) | |
const allResult = await Promise.all(parallelFns) | |
maybeLog(allResult) | |
} else { | |
const fn = callbacks[fnName]; | |
const inputs = getInputsArray(flow[i].in, dataStore) | |
maybeLog(`#${i}: Calling ${fnName}(${flow[i].in.join(',')})`) | |
maybeLog(inputs); | |
const result = await fn(...inputs) | |
maybeLog(`${fnName}() result: `, result); | |
dataStore = addOutputsToDataStore(dataStore, flow[i].out, result); | |
} | |
} | |
return dataStore; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const airflowCallbacks = {} | |
airflowCallbacks.load_credentials = async function (url) { | |
return { user: 'bolota', pass: 'xablau' }; | |
} | |
airflowCallbacks.get_oauth_token = async function (user, pass) { | |
return { token: btoa(user+pass) }; | |
} | |
airflowCallbacks.fetch_mappings = async function (token) { | |
return { mappings: { '5': '_', '9': '-' } }; | |
} | |
airflowCallbacks.fetch_definitions = async function (token) { | |
return { definitions: { threshold: 0.2, min: 0, max: 1 } }; | |
} | |
airflowCallbacks.fetch_user_data = async function (token) { | |
return { user: { name: 'Bolota', age: 29 } }; | |
} | |
airflowCallbacks.get_analytics_data = async function (token, user) { | |
return { data: Array(20).fill(null).map((_,i) => Math.random() * 6 ** 10 | 0) }; | |
} | |
airflowCallbacks.filter_only_analytics_valid = async function (data) { | |
return { data: data.filter(x => x % 2 == 0) }; | |
} | |
airflowCallbacks.add_defs = async function (data, mappings, definitions, user) { | |
return { data: data.map(item => { | |
return (item * definitions.threshold) | |
.toString().split('').map( | |
x => mappings[x] ? mappings[x] : x | |
) + '~' + user.age | |
} | |
) }; | |
} | |
airflowCallbacks.transform_add_threshold = async function (data) { | |
return { data }; | |
} | |
airflowCallbacks.transform_add_hierarchy = async function (data) { | |
return { data }; | |
} | |
const airFlowSettings = [ | |
{ fn: "load_credentials", in: ["url"], out: ["user", "pass"] }, | |
{ fn: "get_oauth_token", in: ["user", "pass"], out: ["token"] }, | |
{ | |
fns: [ | |
{ fn: "fetch_mappings", in: ["token"], out: ["mappings"] }, | |
{ fn: "fetch_definitions", in: ["token"], out: ["definitions"] }, | |
{ fn: "fetch_user_data", in: ["token"], out: ["user"] }, | |
], | |
}, | |
{ fn: "get_analytics_data", in: ["token", "user"], out: ["data"] }, | |
{ fn: "filter_only_analytics_valid", in: ["data"], out: ["data"] }, | |
{ | |
fn: "add_defs", in: ["data", "mappings", "definitions", "user"], out: ["data"], | |
}, | |
{ fn: "transform_add_threshold", in: ["data"], out: ["data"] }, | |
{ fn: "transform_add_hierarchy", in: ["data"], out: ["data"] }, | |
]; | |
(async () => { | |
const airflowResult = await jsAirflow(airFlowSettings, airflowCallbacks, { url: '/api/v1' }); | |
console.log(airflowResult); | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment