Skip to content

Instantly share code, notes, and snippets.

@TianyiLi
Last active April 10, 2017 11:21
Show Gist options
  • Save TianyiLi/21fe0069a602f80986015ab68de49940 to your computer and use it in GitHub Desktop.
Save TianyiLi/21fe0069a602f80986015ab68de49940 to your computer and use it in GitHub Desktop.
stomp use in back-end
import * as Stomp from 'webstomp-client';
import * as WebSocket from 'ws'
export class STOMP {
public state: STOMPState;
public messages: Stomp.Message;
private _config: StompConfig;
private _client: Stomp.Client;
private _resolvePromise: { (...args: any[]): void };
private _connectCallBack: (mes: Stomp.Message) => any;
private _subscribeList: { id: Stomp.Subscription, content: string }[] = []
public constructor() {
this.state = STOMPState.CLOSED;
}
/** Set up configuration */
public configure(config?: StompConfig): void {
// Check for errors:
if (this.state !== STOMPState.CLOSED) {
throw Error('Already running!');
}
if (config === null && this._config === null) {
throw Error('No configuration provided!');
}
// Set our configuration
if (config != null) {
this._config = config;
}
// Connecting via SSL Websocket?
let scheme = 'ws';
if (this._config.ssl) {
scheme = 'wss';
}
// Attempt connection, passing in a callback
this._client = Stomp.over(new WebSocket(`${scheme}://${this._config.host}:${this._config.port}/stomp/websocket`), {
heartbeat: { outgoing: this._config.heartbeat_out, incoming: this._config.heartbeat_in },
debug: this._config.debug,
binary: false,
protocols: [this._config.port.toString()]
});
}
/**
* Perform connection to STOMP broker, returning a Promise
* which is resolved when connected.
*
* The CallBack function is used when the subscribed channel
* send data that the fn should do.
*
* @param {(message: Stomp.Message) => any} callback
* @returns {Promise<{}>}
*
* @memberOf STOMPService
*/
public try_connect(callback: (message: Stomp.Message) => void): Promise<{}> {
if (this.state !== STOMPState.CLOSED) {
throw Error('Can\'t try_connect if not CLOSED!');
}
if (this._client === null) {
throw Error('Client not configured!');
}
this._connectCallBack = callback;
// Attempt connection, passing in a callback
this._client.connect(
this._config.user,
this._config.pass,
this.on_connect,
this.on_error
);
console.log('Connecting...');
this.state = STOMPState.TRYING;
return new Promise(
(resolve, reject) => this._resolvePromise = resolve
);
}
/** Disconnect the STOMP client and clean up */
public disconnect(message?: string): void {
// Notify observers that we are disconnecting!
this.state = STOMPState.DISCONNECTING;
// Disconnect. Callback will set CLOSED state
if (this._client) {
this._client.disconnect(
() => this.state = STOMPState.CLOSED,
message
);
}
}
/** Send a message to all topics */
public publish(message: string, publish: string[]): void {
for (let t of publish) {
this._client.send(t, JSON.stringify(message), {"content-type":"text/plain"});
}
}
/** Subscribe to server message queues */
public subscribe(): void {
// Subscribe to our configured queues
for (let t of this._config.subscribe) {
this.subscribeChannel(t);
}
// Update the state
if (this._config.subscribe.length > 0) {
this.state = STOMPState.SUBSCRIBED;
}
}
public subscribeChannel(channel: string) {
let id = this._client.subscribe(channel, this._connectCallBack, { ack: 'auto' })
this._subscribeList.push({ id: id, content: channel });
}
public unSubscribe(channel: string) {
let { id, content } = this._subscribeList.find(ele => ele.content === channel)
id.unsubscribe();
this._subscribeList.splice(this._subscribeList.indexOf({ id, content }), 1);
}
/**
* Callback Functions
*
* Note the method signature: () => preserves lexical scope
* if we need to use this.x inside the function
*/
public debug(...args: any[]): void {
// Push arguments to this function into console.log
if (console.log && console.log.apply) {
console.log.apply(console, args);
}
}
// Callback run on successfully connecting to server
public on_connect = () => {
console.log('Connected');
// Indicate our connected state to observers
this.state = STOMPState.CONNECTED;
// Subscribe to message queues
this.subscribe();
// Resolve our Promise to the caller
this._resolvePromise();
// Clear callback
this._resolvePromise = null;
}
// Handle errors from stomp.js
public on_error = (error: string) => {
console.error('Error: ', error)
// Check for dropped connection and try reconnecting
if (error) {
// Reset state indicator
this.state = STOMPState.CLOSED;
// Attempt reconnection
console.log('Reconnecting in 5 seconds...');
setTimeout(() => {
this.configure();
this.try_connect(this._connectCallBack);
}, 5000);
}
}
}
/**
* Represents a configuration object for the
* STOMPService to connect to, pub, and sub.
*/
export interface StompConfig {
// Which server?
host: string;
port: number;
ssl: boolean;
// What credentials?
user: string;
pass: string;
// Which queues?
publish: string[];
subscribe: string[];
// How often to heartbeat?
heartbeat_in?: number;
heartbeat_out?: number;
// Enable client debugging?
debug: boolean;
};
/** possible states for the STOMP service */
export enum STOMPState {
CLOSED,
TRYING,
CONNECTED,
SUBSCRIBED,
DISCONNECTING
};
/** look up states for the STOMP service */
export const StateLookup: string[] = [
'CLOSED',
'TRYING',
'CONNECTED',
'SUBSCRIBED',
'DISCONNECTING'
];
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment