Skip to content

Instantly share code, notes, and snippets.

@annibal
Created August 11, 2023 13:10
Show Gist options
  • Save annibal/e01d1b06d074dfdd14b8f9bca0b46e5a to your computer and use it in GitHub Desktop.
Save annibal/e01d1b06d074dfdd14b8f9bca0b46e5a to your computer and use it in GitHub Desktop.
JS Airflow
// Main "Lib" function
/**
* Executes a flow of callbacks passing inputs and storing outputs
* Example: jsAirflow(
* [
* { fn: 'sum', in: ['v1', 'v2'], out: ['result'] },
* { fn: 'power', in: ['result', 'power'], out: ['result'] },
* ],
* obj,
* { v1: 1.7, v2: 1.3, power: 2 }
* )
*
* Starts with v1=1.7, v2=1.3, power=2;
*
* Calls sum() passing the params set in "in", in order, so calls sum(v1, v2):
* sum(1.7, 1.3); // returns { result: 3 };
*
* Stores result=3 in internal data;
* Calls power(data.result, data.power) = power(3, 2); returns // { result: 9 }
*
* returns object containing the result, which in this case will be
* {
* v1: 1.7,
* v2: 1.3,
* power: 2,
* result: 9,
* }
*
* @param flow Array of flowObject={ fn: string, in: string[], out: string[] } or { fns: flowObject[] };
* @param callbacks Object with the actual functions flowObject.fn will execute. Functions have to return a named object, with keys matching the defined out
* @param initialData Start with this object
* @returns Object with all the parsed out's
*/
async function jsAirflow(flow, callbacks, initialData, log = true) {
// Lib Helper
function getInputsArray(inputKeys, dataStore) {
const arrInputs = [];
inputKeys.forEach((in_key) => arrInputs.push(dataStore[in_key]) );
return arrInputs
}
// Lib Helper
function addOutputsToDataStore(currentDataStore, outputNames, resultObj) {
const newDataStore = { ...currentDataStore }
outputNames.forEach((out_key) => {
newDataStore[out_key] = resultObj[out_key];
});
return newDataStore;
}
function maybeLog = function() { if (log) console.log(...arguments); }
// Initial Values
let dataStore = { ...initialData };
for (let i = 0; i < flow.length; i += 1) {
const fnName = flow[i].fn;
if (flow[i].fns) {
const parallelFns = [];
flow[i].fns.forEach((subFlow, subFlowIdx) => parallelFns.push(
new Promise(async (resolve) => {
const subFnInput = getInputsArray(subFlow.in, dataStore);
const subFnName = subFlow.fn;
maybeLog(` #${i}.${subFlowIdx}: Calling sub ${subFnName}(${subFlow.in.join(',')})`)
maybeLog(subFnInput);
const result = await callbacks[subFnName](...subFnInput);
maybeLog(` ${subFnName}() result: `, result);
dataStore = addOutputsToDataStore(dataStore, subFlow.out, result);
resolve(result);
})
));
maybeLog(parallelFns)
const allResult = await Promise.all(parallelFns)
maybeLog(allResult)
} else {
const fn = callbacks[fnName];
const inputs = getInputsArray(flow[i].in, dataStore)
maybeLog(`#${i}: Calling ${fnName}(${flow[i].in.join(',')})`)
maybeLog(inputs);
const result = await fn(...inputs)
maybeLog(`${fnName}() result: `, result);
dataStore = addOutputsToDataStore(dataStore, flow[i].out, result);
}
}
return dataStore;
}
const airflowCallbacks = {}
airflowCallbacks.load_credentials = async function (url) {
return { user: 'bolota', pass: 'xablau' };
}
airflowCallbacks.get_oauth_token = async function (user, pass) {
return { token: btoa(user+pass) };
}
airflowCallbacks.fetch_mappings = async function (token) {
return { mappings: { '5': '_', '9': '-' } };
}
airflowCallbacks.fetch_definitions = async function (token) {
return { definitions: { threshold: 0.2, min: 0, max: 1 } };
}
airflowCallbacks.fetch_user_data = async function (token) {
return { user: { name: 'Bolota', age: 29 } };
}
airflowCallbacks.get_analytics_data = async function (token, user) {
return { data: Array(20).fill(null).map((_,i) => Math.random() * 6 ** 10 | 0) };
}
airflowCallbacks.filter_only_analytics_valid = async function (data) {
return { data: data.filter(x => x % 2 == 0) };
}
airflowCallbacks.add_defs = async function (data, mappings, definitions, user) {
return { data: data.map(item => {
return (item * definitions.threshold)
.toString().split('').map(
x => mappings[x] ? mappings[x] : x
) + '~' + user.age
}
) };
}
airflowCallbacks.transform_add_threshold = async function (data) {
return { data };
}
airflowCallbacks.transform_add_hierarchy = async function (data) {
return { data };
}
const airFlowSettings = [
{ fn: "load_credentials", in: ["url"], out: ["user", "pass"] },
{ fn: "get_oauth_token", in: ["user", "pass"], out: ["token"] },
{
fns: [
{ fn: "fetch_mappings", in: ["token"], out: ["mappings"] },
{ fn: "fetch_definitions", in: ["token"], out: ["definitions"] },
{ fn: "fetch_user_data", in: ["token"], out: ["user"] },
],
},
{ fn: "get_analytics_data", in: ["token", "user"], out: ["data"] },
{ fn: "filter_only_analytics_valid", in: ["data"], out: ["data"] },
{
fn: "add_defs", in: ["data", "mappings", "definitions", "user"], out: ["data"],
},
{ fn: "transform_add_threshold", in: ["data"], out: ["data"] },
{ fn: "transform_add_hierarchy", in: ["data"], out: ["data"] },
];
(async () => {
const airflowResult = await jsAirflow(airFlowSettings, airflowCallbacks, { url: '/api/v1' });
console.log(airflowResult);
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment