Created
November 25, 2019 16:08
-
-
Save jsanta/d386a3f908243070c3a6ec9f2a75531b to your computer and use it in GitHub Desktop.
Postgres stream example using Express + Sequelize
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // jshint esversion: 6 | |
| /** | |
| * El comentario jshint es para indicar al parser de javascript del editor que se utilizarán | |
| * características de ES6 (EcmaScript 6), como funciones de flecha, const, let, y otros. | |
| */ | |
| /** | |
| * Requeridos para poder definir la API | |
| * - express: permite definir los endpoints de la aplicación | |
| * - cors: permite que los endpoints de la API sean accesibles desde servidores externos | |
| * - bodyparser: simplifica el rescate de parametros de los endpoints de la API | |
| * - router: encargado de la gestión de rutas (endpoints) de la aplicación | |
| */ | |
| const express = require('express'); | |
| const cors = require('cors'); | |
| const bodyParser = require('body-parser'); | |
| const router = express.Router(); | |
| /** | |
| * La configuración de la aplicación debe manejarse en un archivo externo. | |
| */ | |
| const config = require('./config'); | |
| /** | |
| * Define la aplicación express, y define un servidor HTTP que la va a utilizar. | |
| * Nótese que se utiliza let en vez de const para definir la variable app. Esto es ya que app va | |
| * a cambiar en su valor en otras partes del programa. | |
| */ | |
| let app = express(); | |
| const http = require('http').Server(app); | |
| /** | |
| * Se le indica a la aplicación utilizar CORS para permitir llamadas a la API desde distintos | |
| * orígenes. CORS permite la definición de ciertos parámetros, entre ellos los relativos a las | |
| * cabeceras de las peticiones HTTP, y los métodos que se van a admitir. | |
| * Si no se pasa ningún objeto como parámetro CORS va a utilizar la configuración por defecto de | |
| * la librería. | |
| */ | |
| app.use(cors({ | |
| origin: true, | |
| methods: [ 'GET', 'POST', 'OPTIONS', 'PUT', 'DELETE' ], | |
| allowedHeaders: [ 'Content-Type', 'Authorization', 'X-Requested-With' ], | |
| exposedHeaders: [ 'X-Token', 'X-RefreshToken' ], | |
| preflightContinue: false, | |
| optionsSuccessStatus: 204 | |
| })); | |
| /** | |
| * Body-Parser es una librería de tipo middleware que permite a express un manejo simplificado de los | |
| * parámetros que recibe cada petición HTTP. | |
| */ | |
| app.use(bodyParser.json()); | |
| app.use(bodyParser.urlencoded({ | |
| extended: false | |
| })); | |
| /** | |
| * Setea el puerto donde va a estar escuchando el servidor. Obtiene esta información desde el archivo de | |
| * configuración. | |
| */ | |
| const ipAddress = config.APP_IP; | |
| const port = config.APP_PORT; | |
| app.set('port', port); | |
| /** | |
| * Inicializa el modelo de la base de datos y realiza la carga inicial de datos | |
| * (si es necesario) | |
| */ | |
| // const initDB = require('./model'); | |
| // initDB(); | |
| /** | |
| * AQUI SE DEFINE LA API | |
| * Esta definición puede ser directamente en esta sección, pero si la API comenzara a crecer, el archivo | |
| * también crecerá proporcionalmente. Para efectos de mantención y separación de responsabilidades es mejor | |
| * manejar la lógica de la API en archivos separados. | |
| */ | |
| app.use('/api', require('./streamApi')(router)); | |
| /** | |
| * Se define y ejecuta el servidor HTTP con la aplicación Express. | |
| * Esto podría hacerse derechamente con app.listen(port), pero hay casos donde se requiere mayor control | |
| * sobre el objeto servidor y las funciones que provee (por ejemplo monitorear los sockets de conexión). | |
| */ | |
| const server = http.listen(app.get('port'), ipAddress, () => { | |
| console.log('Server started on localhost:' + port + '; Press Ctrl-C to terminate.'); | |
| console.log('Application worker ', process.pid, ' started...'); | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const QueryStream = require('pg-query-stream'); | |
| const through2 = require('through2'); | |
| const pump = require('pump'); | |
| // Ref.: https://github.com/staeco/iris-ql/blob/master/src/util/export.js | |
| class SequelizeStream { | |
| constructor(pSequelize, pSql, pProcessTransform, pProcessEnd) { | |
| this.batchSize = 16; | |
| this.sequelize = pSequelize; | |
| this.sql = pSql; | |
| this.processTransform = pProcessTransform; | |
| this.processEnd = pProcessEnd; | |
| this.conn = undefined; | |
| this.query = undefined; | |
| } | |
| end(err) { | |
| const self = this; | |
| console.log('Sequelize stream end()'); | |
| self.query.close(() => { | |
| self.sequelize.connectionManager.releaseConnection(self.conn) | |
| .then(() => null) | |
| .catch(() => null) | |
| }); | |
| if (err) { | |
| console.warn('Warning: ', err); | |
| } | |
| console.log('Calling ProcessEnd'); | |
| self.processEnd(); | |
| } | |
| async init() { | |
| console.log('Sequelize stream init()'); | |
| this.conn = await (this.sequelize).connectionManager.getConnection({ | |
| type: 'SELECT' | |
| }); | |
| this.query = (this.conn).query(new QueryStream(this.sql, undefined, { | |
| batchSize: this.batchSize, | |
| types: { | |
| getTypeParser: (this.conn).getTypeParser.bind(this.conn) | |
| } | |
| })); | |
| } | |
| async stream() { | |
| await this.init(); | |
| const modifier = through2.obj((obj, _, cb) => { | |
| try { | |
| cb(null, this.processTransform(obj)); | |
| } catch (ex) { | |
| console.error('Could not execute transform', ex); | |
| this.end(); | |
| } | |
| }); | |
| let out = pump(this.query, modifier, (err) => this.end(err)); | |
| return out; | |
| } | |
| } | |
| module.exports = SequelizeStream; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const SequelizeStream = require('./sequelize-stream'); | |
| const { sequelize } = require('./config/dbConfig'); // Refer to API starter project | |
| const sql = `SELECT * FROM errors_dashboard.errors AS e`; | |
| module.exports = (router) => { | |
| router.get('/xstream', async (req, res, next) => { | |
| res.writeHead(200, { | |
| 'Content-Type': 'application/json', | |
| 'Transfer-Encoding': 'chunked' | |
| }); | |
| const seqStream = new SequelizeStream(sequelize, sql, | |
| (obj) => { res.write(JSON.stringify(obj) + '\n'); }, | |
| () => res.end()); | |
| seqStream.stream(); | |
| req.on('close', () => { | |
| console.warn('Request interrupted!'); | |
| try { seqStream.end('Request interrupted!'); } | |
| catch (ex) { | |
| console.error(ex); | |
| res.end(); | |
| } | |
| }); | |
| }); | |
| return router; | |
| }; |
Author
@jsanta thanks for getting back to me, I don't know if you know how AWS RDS works you can create group parameters and share them across, our staging and production DB share the same group parameters, but somehow I might need to look deeper on the pool connection because we using default pool connection. i will need to modify it and see if it can resolve my issues
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @reaganscofield .
Line 23 should be required to avoid having an open connection that lives forever, even if it's not closed.
Have no idea why it works on your staging and not on your production. My first suspect is some difference in you staging and production connection parameters (happened to us once, we where using a fixed connection pool with some timeouts configured on production and staging was not).
Hope it helps. Best regards and stay safe.