Skip to content

Instantly share code, notes, and snippets.

@TianyiLi
Created December 9, 2016 01:23
Show Gist options
  • Save TianyiLi/ecf8ac0b9bb24ae0ddd91978770e8fa6 to your computer and use it in GitHub Desktop.
Save TianyiLi/ecf8ac0b9bb24ae0ddd91978770e8fa6 to your computer and use it in GitHub Desktop.
StompJS use in backend
import * as Stomp from 'stompjs';
export class STOMPService {
public state: STOMPState;
public messages: Stomp.Message;
private config: StompConfig;
private client: Stomp.Client;
private resolvePromise: { (...args: any[]): void };
private connectCallBack: (mes: Stomp.Message) => any;
public constructor() {
this.state = STOMPState.CLOSED;
}
/** Set up configuration */
public configure(config?: StompConfig): void {
// Check for errors:
if (this.state !== STOMPState.CLOSED) {
throw Error('Already running!');
}
if (config === null && this.config === null) {
throw Error('No configuration provided!');
}
// Set our configuration
if (config != null) {
this.config = config;
}
// Connecting via SSL Websocket?
let scheme = 'ws';
if (this.config.ssl) {
scheme = 'wss';
}
// Attempt connection, passing in a callback
this.client = Stomp.overWS(`${scheme}://${this.config.host}:${this.config.port}/stomp/websocket`);
// Configure client heartbeating
this.client.heartbeat.incoming = this.config.heartbeat_in;
this.client.heartbeat.outgoing = this.config.heartbeat_out;
// Set function to debug print messages
this.client.debug = this.config.debug || this.config.debug == null ? this.debug : null;
}
/**
* Perform connection to STOMP broker, returning a Promise
* which is resolved when connected.
*
* The CallBack function is used when the subscribed channel
* send data that the fn should do.
*
* @param {(message: Stomp.Message) => any} callback
* @returns {Promise<{}>}
*
* @memberOf STOMPService
*/
public try_connect(callback: (message: Stomp.Message) => void): Promise<{}> {
if (this.state !== STOMPState.CLOSED) {
throw Error('Can\'t try_connect if not CLOSED!');
}
if (this.client === null) {
throw Error('Client not configured!');
}
this.connectCallBack = callback;
// Attempt connection, passing in a callback
this.client.connect(
this.config.user,
this.config.pass,
this.on_connect,
this.on_error
);
console.log('Connecting...');
this.state = STOMPState.TRYING;
return new Promise(
(resolve, reject) => this.resolvePromise = resolve
);
}
/** Disconnect the STOMP client and clean up */
public disconnect(message?: string): void {
// Notify observers that we are disconnecting!
this.state = STOMPState.DISCONNECTING;
// Disconnect. Callback will set CLOSED state
if (this.client) {
this.client.disconnect(
() => this.state = STOMPState.CLOSED,
message
);
}
}
/** Send a message to all topics */
public publish(message: string, publish:string[]): void {
for (let t of publish) {
this.client.send(t, {}, message);
}
}
/** Subscribe to server message queues */
public subscribe(): void {
// Subscribe to our configured queues
for (let t of this.config.subscribe) {
this.client.subscribe(t, this.connectCallBack, { ack: 'auto' });
}
// Update the state
if (this.config.subscribe.length > 0) {
this.state = STOMPState.SUBSCRIBED;
}
}
/**
* Callback Functions
*
* Note the method signature: () => preserves lexical scope
* if we need to use this.x inside the function
*/
public debug(...args: any[]): void {
// Push arguments to this function into console.log
if (console.log && console.log.apply) {
console.log.apply(console, args);
}
}
// Callback run on successfully connecting to server
public on_connect = () => {
console.log('Connected');
// Indicate our connected state to observers
this.state = STOMPState.CONNECTED;
// Subscribe to message queues
this.subscribe();
// Resolve our Promise to the caller
this.resolvePromise();
// Clear callback
this.resolvePromise = null;
}
// Handle errors from stomp.js
public on_error = (error: string) => {
console.error('Error: ' + error);
// Check for dropped connection and try reconnecting
if (error.indexOf('Lost connection') !== -1) {
// Reset state indicator
this.state = STOMPState.CLOSED;
// Attempt reconnection
console.log('Reconnecting in 5 seconds...');
setTimeout(() => {
this.configure();
this.try_connect(this.connectCallBack);
}, 5000);
}
}
// On message RX, notify the Observable with the message object
public on_message = (message: Stomp.Message) => {
if (message.body) {
this.messages = message;
} else {
console.error('Empty message received!');
}
}
}
/**
* Represents a configuration object for the
* STOMPService to connect to, pub, and sub.
*/
export interface StompConfig {
// Which server?
host: string;
port: number;
ssl: boolean;
// What credentials?
user: string;
pass: string;
// Which queues?
publish: string[];
subscribe: string[];
// How often to heartbeat?
heartbeat_in?: number;
heartbeat_out?: number;
// Enable client debugging?
debug: boolean;
};
/** possible states for the STOMP service */
export enum STOMPState {
CLOSED,
TRYING,
CONNECTED,
SUBSCRIBED,
DISCONNECTING
};
/** look up states for the STOMP service */
export const StateLookup: string[] = [
'CLOSED',
'TRYING',
'CONNECTED',
'SUBSCRIBED',
'DISCONNECTING'
];
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment