Skip to content

Instantly share code, notes, and snippets.

@MarkusPfundstein
Created March 27, 2017 06:47
Show Gist options
  • Save MarkusPfundstein/246446e428ab10dbd2383cf3204a19d0 to your computer and use it in GitHub Desktop.
Save MarkusPfundstein/246446e428ab10dbd2383cf3204a19d0 to your computer and use it in GitHub Desktop.
Javascript Reader.T(Task) example (why to use Monads)
const Task = require('data.task');
const Maybe = require('data.maybe');
const { Reader, ReaderT } = require('ramda-fantasy');
const R = require('ramda');
const mysql = require('mysql');
const request = require('request');
const ReaderTask = Reader.T(Task);
const logE = (...x) => console.error('oh oh error', ...x);
const log = (...x) => console.log(...x);
const safeHead = xs => xs.length > 0 ? Maybe.Just(xs[0]) : Maybe.Nothing();
const config = {
sql : {
host : '',
user : '',
password: '',
database: ''
},
elasticSearch: {
host : '',
port : '',
index : 'docs',
batchUpload : {
delay: 250,
size: 50
}
}
};
const createEnvironment = config => ({
sqlConnection : mysql.createConnection(config.sql),
elasticSearch : config.elasticSearch,
});
const checkRequestOk = request => request.statusCode == 200 || request.statusCode == 201;
const putPublicationToElastic = publication => ReaderTask(env => {
const id = publication.id;
return new Task((rej, res) => {
log(`upload publication ${publication.id}`);
request({
url : `http://${env.elasticSearch.host}:${env.elasticSearch.port}/${env.elasticSearch.index}/external/${id}`,
method : 'PUT',
json : publication
}, (error, request, body) => error ? rej(error) : (checkRequestOk(request) ? res(body) : rej(body)))
});
});
const sqlQuery = query => ReaderTask(env =>
new Task((rej, res) =>
env.sqlConnection.query(query, (error, results, fields) =>
error ? rej(error) : res(results))))
const closeConnection = data => ReaderTask(env =>
new Task((rej, res) =>
env.sqlConnection.end(_ => res(data))));
const tryGetFileTextFromFile = file =>
sqlQuery(`select txt from files_text where file_id = ${file.id} limit 1`)
.map(safeHead)
const tryGetPublicationFromFile = file =>
sqlQuery(`select * from publications where id = ${file.publication_id} limit 1`)
.map(safeHead);
const createUploadDocument = R.curry((file, maybeText, maybePublication) =>
maybeText.cata({
Nothing: () =>({}),
Just: textRow => {
const publication = maybePublication.getOrElse({});
return {
id : file.id,
abstract: file.abstract,
publishDate: publication.publishdate || null,
title: file.title,
type: file.type,
text: textRow.txt // text not for result
}
}
}));
const getAllTexts =
sqlQuery(`select * from files`)
.chain(R.reduce(
(result, file) =>
result.chain(xs =>
ReaderTask.of(createUploadDocument(file))
.ap(tryGetFileTextFromFile(file))
.ap(tryGetPublicationFromFile(file))
.map(newDoc => [...xs, newDoc])),
ReaderTask(_ => Task.of([]))));
const wait = delay => ReaderTask(env =>
new Task((rej, res) => setTimeout(res, delay)));
// reduces 'list of list of Tasks to upload a publication' to one task.
// [[pub1, ..., pub9], [pub10, ..., pub19], ...]
// between every list of tasks, 'delay'-milliseconds are waited
const uploadPubsWithDelay = delay =>
R.reduce((result, next) =>
result.chain(xs =>
wait(delay)
.chain(_ =>
R.traverse(ReaderTask.of, putPublicationToElastic, next).map(ys => [...xs, ...ys]))),
ReaderTask(_ => Task.of([])))
const delayedBatchUpload = pubs => ReaderTask.ask.chain(env =>
R.pipe(
// make batches
R.splitEvery(env.elasticSearch.batchUpload.size),
// we upload with x ms delay between every batch. In the hope
// elastic search won't get killed. it will without delay :-)
uploadPubsWithDelay(env.elasticSearch.batchUpload.delay))(pubs));
const runApp = env =>
getAllTexts
/*.map(R.tap(log))*/
// filter all publications without text
.map(R.tap(pubs => log(`fetched ${pubs.length} documents`)))
.map(R.filter(pub => pub.text && pub.text.length > 0))
.map(R.tap(pubs => log(`try to upload ${pubs.length} documents`)))
.chain(delayedBatchUpload)
.map(inserted => `inserted ${inserted.length} documents`)
// To-DO: closeConnection also in case of failure
.chain(closeConnection)
.run(env)
runApp(createEnvironment(config))
.fork(logE, log)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment