|  | import Stomp from 'stomp-client'; | 
        
          |  | import { activemq } from './config.json'; | 
        
          |  | import console from './console'; | 
        
          |  |  | 
        
          |  | const hostname = require('os').hostname(); | 
        
          |  |  | 
        
          |  | class QueueClient { | 
        
          |  | constructor(...queueNames) { | 
        
          |  | this.queueNames = queueNames; | 
        
          |  | this.client = new Stomp(activemq.host, activemq.port, activemq.username, activemq.password, '1.1'); | 
        
          |  | this.client.connect(this.connected.bind(this)); | 
        
          |  | } | 
        
          |  | inspect(job) { | 
        
          |  | const cloned = JSON.parse(JSON.stringify(job)); | 
        
          |  | ['expires', 'destination', 'subscription', 'priority', 'message-id', 'timestamp', 'redelivered'].forEach(i => delete cloned[i]); | 
        
          |  | return JSON.stringify(cloned); | 
        
          |  | } | 
        
          |  | connected() { | 
        
          |  | this.subscribe(); | 
        
          |  | } | 
        
          |  | unsubscribe(...queueNames) { | 
        
          |  | if (!queueNames.length) { | 
        
          |  | queueNames = this.queueNames; | 
        
          |  | } | 
        
          |  | queueNames.forEach( (queueName) => { | 
        
          |  | const subscriptionId = `${hostname}-${queueName}`; | 
        
          |  | const options = { 'activemq.prefetchSize': 1, ack: 'client-individual', id: subscriptionId }; | 
        
          |  | this.client.unsubscribe(`/queue/${activemq.queues[queueName]}`, options); | 
        
          |  | }); | 
        
          |  | } | 
        
          |  | subscribe(...queueNames) { | 
        
          |  | if (!queueNames.length) { | 
        
          |  | queueNames = this.queueNames; | 
        
          |  | } | 
        
          |  | queueNames.forEach( (queueName) => { | 
        
          |  | const subscriptionId = `${hostname}-${queueName}`; | 
        
          |  | const options = { 'activemq.prefetchSize': 1, ack: 'client-individual', id: subscriptionId }; | 
        
          |  | const cb = (body, headers) => { | 
        
          |  | const id = headers['message-id']; | 
        
          |  | const ack = () => this.client.ack(id, subscriptionId); | 
        
          |  | const nack = () => { | 
        
          |  | console.error('nack'); | 
        
          |  | // this.client.nack(id, subscriptionId); | 
        
          |  | }; | 
        
          |  | console.log(`Consuming at ${queueName} => ${this.inspect(headers)}`); | 
        
          |  | this.process(headers, { ack, nack, queueName }); | 
        
          |  | }; | 
        
          |  | this.client.subscribe(`/queue/${activemq.queues[queueName]}`, cb, options); | 
        
          |  | }); | 
        
          |  | } | 
        
          |  | process(job) { | 
        
          |  | throw 'This is a template method meant to be overriten by subclases'; | 
        
          |  | } | 
        
          |  | publish(queueName, job) { | 
        
          |  | console.log(`Publish at ${queueName} => ${this.inspect(job)}`); | 
        
          |  | this.client.publish(`/queue/${activemq.queues[queueName]}`, job, job); | 
        
          |  | const data = { ...job, queue: queueName }; | 
        
          |  | this.client.publish(`/queue/${activemq.queues.echo}`, data, data); | 
        
          |  | } | 
        
          |  | } | 
        
          |  |  | 
        
          |  | export default QueueClient; |