Skip to content

Instantly share code, notes, and snippets.

@jsanta
Created November 25, 2019 16:08
Show Gist options
  • Select an option

  • Save jsanta/d386a3f908243070c3a6ec9f2a75531b to your computer and use it in GitHub Desktop.

Select an option

Save jsanta/d386a3f908243070c3a6ec9f2a75531b to your computer and use it in GitHub Desktop.
Postgres stream example using Express + Sequelize
// jshint esversion: 6
/**
* El comentario jshint es para indicar al parser de javascript del editor que se utilizarán
* características de ES6 (EcmaScript 6), como funciones de flecha, const, let, y otros.
*/
/**
* Requeridos para poder definir la API
* - express: permite definir los endpoints de la aplicación
* - cors: permite que los endpoints de la API sean accesibles desde servidores externos
* - bodyparser: simplifica el rescate de parametros de los endpoints de la API
* - router: encargado de la gestión de rutas (endpoints) de la aplicación
*/
const express = require('express');
const cors = require('cors');
const bodyParser = require('body-parser');
const router = express.Router();
/**
* La configuración de la aplicación debe manejarse en un archivo externo.
*/
const config = require('./config');
/**
* Define la aplicación express, y define un servidor HTTP que la va a utilizar.
* Nótese que se utiliza let en vez de const para definir la variable app. Esto es ya que app va
* a cambiar en su valor en otras partes del programa.
*/
let app = express();
const http = require('http').Server(app);
/**
* Se le indica a la aplicación utilizar CORS para permitir llamadas a la API desde distintos
* orígenes. CORS permite la definición de ciertos parámetros, entre ellos los relativos a las
* cabeceras de las peticiones HTTP, y los métodos que se van a admitir.
* Si no se pasa ningún objeto como parámetro CORS va a utilizar la configuración por defecto de
* la librería.
*/
app.use(cors({
origin: true,
methods: [ 'GET', 'POST', 'OPTIONS', 'PUT', 'DELETE' ],
allowedHeaders: [ 'Content-Type', 'Authorization', 'X-Requested-With' ],
exposedHeaders: [ 'X-Token', 'X-RefreshToken' ],
preflightContinue: false,
optionsSuccessStatus: 204
}));
/**
* Body-Parser es una librería de tipo middleware que permite a express un manejo simplificado de los
* parámetros que recibe cada petición HTTP.
*/
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({
extended: false
}));
/**
* Setea el puerto donde va a estar escuchando el servidor. Obtiene esta información desde el archivo de
* configuración.
*/
const ipAddress = config.APP_IP;
const port = config.APP_PORT;
app.set('port', port);
/**
* Inicializa el modelo de la base de datos y realiza la carga inicial de datos
* (si es necesario)
*/
// const initDB = require('./model');
// initDB();
/**
* AQUI SE DEFINE LA API
* Esta definición puede ser directamente en esta sección, pero si la API comenzara a crecer, el archivo
* también crecerá proporcionalmente. Para efectos de mantención y separación de responsabilidades es mejor
* manejar la lógica de la API en archivos separados.
*/
app.use('/api', require('./streamApi')(router));
/**
* Se define y ejecuta el servidor HTTP con la aplicación Express.
* Esto podría hacerse derechamente con app.listen(port), pero hay casos donde se requiere mayor control
* sobre el objeto servidor y las funciones que provee (por ejemplo monitorear los sockets de conexión).
*/
const server = http.listen(app.get('port'), ipAddress, () => {
console.log('Server started on localhost:' + port + '; Press Ctrl-C to terminate.');
console.log('Application worker ', process.pid, ' started...');
});
const QueryStream = require('pg-query-stream');
const through2 = require('through2');
const pump = require('pump');
// Ref.: https://github.com/staeco/iris-ql/blob/master/src/util/export.js
class SequelizeStream {
constructor(pSequelize, pSql, pProcessTransform, pProcessEnd) {
this.batchSize = 16;
this.sequelize = pSequelize;
this.sql = pSql;
this.processTransform = pProcessTransform;
this.processEnd = pProcessEnd;
this.conn = undefined;
this.query = undefined;
}
end(err) {
const self = this;
console.log('Sequelize stream end()');
self.query.close(() => {
self.sequelize.connectionManager.releaseConnection(self.conn)
.then(() => null)
.catch(() => null)
});
if (err) {
console.warn('Warning: ', err);
}
console.log('Calling ProcessEnd');
self.processEnd();
}
async init() {
console.log('Sequelize stream init()');
this.conn = await (this.sequelize).connectionManager.getConnection({
type: 'SELECT'
});
this.query = (this.conn).query(new QueryStream(this.sql, undefined, {
batchSize: this.batchSize,
types: {
getTypeParser: (this.conn).getTypeParser.bind(this.conn)
}
}));
}
async stream() {
await this.init();
const modifier = through2.obj((obj, _, cb) => {
try {
cb(null, this.processTransform(obj));
} catch (ex) {
console.error('Could not execute transform', ex);
this.end();
}
});
let out = pump(this.query, modifier, (err) => this.end(err));
return out;
}
}
module.exports = SequelizeStream;
const SequelizeStream = require('./sequelize-stream');
const { sequelize } = require('./config/dbConfig'); // Refer to API starter project
const sql = `SELECT * FROM errors_dashboard.errors AS e`;
module.exports = (router) => {
router.get('/xstream', async (req, res, next) => {
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked'
});
const seqStream = new SequelizeStream(sequelize, sql,
(obj) => { res.write(JSON.stringify(obj) + '\n'); },
() => res.end());
seqStream.stream();
req.on('close', () => {
console.warn('Request interrupted!');
try { seqStream.end('Request interrupted!'); }
catch (ex) {
console.error(ex);
res.end();
}
});
});
return router;
};
@jsanta
Copy link
Author

jsanta commented Aug 30, 2021

I just implement your gist about a week ago but somehow this does works on our staging environment correctly but it does not work on production it's basically complaining about ResourceRequest times out, we using PostgreSQL on RDS do you have any idea what could be the cause of this ? and also the closing connection on line 23 it does works either is it required?

Hi @reaganscofield .
Line 23 should be required to avoid having an open connection that lives forever, even if it's not closed.
Have no idea why it works on your staging and not on your production. My first suspect is some difference in you staging and production connection parameters (happened to us once, we where using a fixed connection pool with some timeouts configured on production and staging was not).

Hope it helps. Best regards and stay safe.

@reaganscofield
Copy link

@jsanta thanks for getting back to me, I don't know if you know how AWS RDS works you can create group parameters and share them across, our staging and production DB share the same group parameters, but somehow I might need to look deeper on the pool connection because we using default pool connection. i will need to modify it and see if it can resolve my issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment