Created
December 9, 2016 01:23
-
-
Save TianyiLi/ecf8ac0b9bb24ae0ddd91978770e8fa6 to your computer and use it in GitHub Desktop.
StompJS use in backend
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as Stomp from 'stompjs'; | |
export class STOMPService { | |
public state: STOMPState; | |
public messages: Stomp.Message; | |
private config: StompConfig; | |
private client: Stomp.Client; | |
private resolvePromise: { (...args: any[]): void }; | |
private connectCallBack: (mes: Stomp.Message) => any; | |
public constructor() { | |
this.state = STOMPState.CLOSED; | |
} | |
/** Set up configuration */ | |
public configure(config?: StompConfig): void { | |
// Check for errors: | |
if (this.state !== STOMPState.CLOSED) { | |
throw Error('Already running!'); | |
} | |
if (config === null && this.config === null) { | |
throw Error('No configuration provided!'); | |
} | |
// Set our configuration | |
if (config != null) { | |
this.config = config; | |
} | |
// Connecting via SSL Websocket? | |
let scheme = 'ws'; | |
if (this.config.ssl) { | |
scheme = 'wss'; | |
} | |
// Attempt connection, passing in a callback | |
this.client = Stomp.overWS(`${scheme}://${this.config.host}:${this.config.port}/stomp/websocket`); | |
// Configure client heartbeating | |
this.client.heartbeat.incoming = this.config.heartbeat_in; | |
this.client.heartbeat.outgoing = this.config.heartbeat_out; | |
// Set function to debug print messages | |
this.client.debug = this.config.debug || this.config.debug == null ? this.debug : null; | |
} | |
/** | |
* Perform connection to STOMP broker, returning a Promise | |
* which is resolved when connected. | |
* | |
* The CallBack function is used when the subscribed channel | |
* send data that the fn should do. | |
* | |
* @param {(message: Stomp.Message) => any} callback | |
* @returns {Promise<{}>} | |
* | |
* @memberOf STOMPService | |
*/ | |
public try_connect(callback: (message: Stomp.Message) => void): Promise<{}> { | |
if (this.state !== STOMPState.CLOSED) { | |
throw Error('Can\'t try_connect if not CLOSED!'); | |
} | |
if (this.client === null) { | |
throw Error('Client not configured!'); | |
} | |
this.connectCallBack = callback; | |
// Attempt connection, passing in a callback | |
this.client.connect( | |
this.config.user, | |
this.config.pass, | |
this.on_connect, | |
this.on_error | |
); | |
console.log('Connecting...'); | |
this.state = STOMPState.TRYING; | |
return new Promise( | |
(resolve, reject) => this.resolvePromise = resolve | |
); | |
} | |
/** Disconnect the STOMP client and clean up */ | |
public disconnect(message?: string): void { | |
// Notify observers that we are disconnecting! | |
this.state = STOMPState.DISCONNECTING; | |
// Disconnect. Callback will set CLOSED state | |
if (this.client) { | |
this.client.disconnect( | |
() => this.state = STOMPState.CLOSED, | |
message | |
); | |
} | |
} | |
/** Send a message to all topics */ | |
public publish(message: string, publish:string[]): void { | |
for (let t of publish) { | |
this.client.send(t, {}, message); | |
} | |
} | |
/** Subscribe to server message queues */ | |
public subscribe(): void { | |
// Subscribe to our configured queues | |
for (let t of this.config.subscribe) { | |
this.client.subscribe(t, this.connectCallBack, { ack: 'auto' }); | |
} | |
// Update the state | |
if (this.config.subscribe.length > 0) { | |
this.state = STOMPState.SUBSCRIBED; | |
} | |
} | |
/** | |
* Callback Functions | |
* | |
* Note the method signature: () => preserves lexical scope | |
* if we need to use this.x inside the function | |
*/ | |
public debug(...args: any[]): void { | |
// Push arguments to this function into console.log | |
if (console.log && console.log.apply) { | |
console.log.apply(console, args); | |
} | |
} | |
// Callback run on successfully connecting to server | |
public on_connect = () => { | |
console.log('Connected'); | |
// Indicate our connected state to observers | |
this.state = STOMPState.CONNECTED; | |
// Subscribe to message queues | |
this.subscribe(); | |
// Resolve our Promise to the caller | |
this.resolvePromise(); | |
// Clear callback | |
this.resolvePromise = null; | |
} | |
// Handle errors from stomp.js | |
public on_error = (error: string) => { | |
console.error('Error: ' + error); | |
// Check for dropped connection and try reconnecting | |
if (error.indexOf('Lost connection') !== -1) { | |
// Reset state indicator | |
this.state = STOMPState.CLOSED; | |
// Attempt reconnection | |
console.log('Reconnecting in 5 seconds...'); | |
setTimeout(() => { | |
this.configure(); | |
this.try_connect(this.connectCallBack); | |
}, 5000); | |
} | |
} | |
// On message RX, notify the Observable with the message object | |
public on_message = (message: Stomp.Message) => { | |
if (message.body) { | |
this.messages = message; | |
} else { | |
console.error('Empty message received!'); | |
} | |
} | |
} | |
/** | |
* Represents a configuration object for the | |
* STOMPService to connect to, pub, and sub. | |
*/ | |
export interface StompConfig { | |
// Which server? | |
host: string; | |
port: number; | |
ssl: boolean; | |
// What credentials? | |
user: string; | |
pass: string; | |
// Which queues? | |
publish: string[]; | |
subscribe: string[]; | |
// How often to heartbeat? | |
heartbeat_in?: number; | |
heartbeat_out?: number; | |
// Enable client debugging? | |
debug: boolean; | |
}; | |
/** possible states for the STOMP service */ | |
export enum STOMPState { | |
CLOSED, | |
TRYING, | |
CONNECTED, | |
SUBSCRIBED, | |
DISCONNECTING | |
}; | |
/** look up states for the STOMP service */ | |
export const StateLookup: string[] = [ | |
'CLOSED', | |
'TRYING', | |
'CONNECTED', | |
'SUBSCRIBED', | |
'DISCONNECTING' | |
]; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment