Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import * as Stomp from 'stompjs';
- export class STOMPService {
- public state: STOMPState;
- public messages: Stomp.Message;
- private config: StompConfig;
- private client: Stomp.Client;
- private resolvePromise: { (...args: any[]): void };
- private connectCallBack: (mes: Stomp.Message) => any;
- public constructor() {
- this.state = STOMPState.CLOSED;
- }
- /** Set up configuration */
- public configure(config?: StompConfig): void {
- // Check for errors:
- if (this.state !== STOMPState.CLOSED) {
- throw Error('Already running!');
- }
- if (config === null && this.config === null) {
- throw Error('No configuration provided!');
- }
- // Set our configuration
- if (config != null) {
- this.config = config;
- }
- // Connecting via SSL Websocket?
- let scheme = 'ws';
- if (this.config.ssl) {
- scheme = 'wss';
- }
- // Attempt connection, passing in a callback
- this.client = Stomp.overWS(`${scheme}://${this.config.host}:${this.config.port}/stomp/websocket`);
- // Configure client heartbeating
- this.client.heartbeat.incoming = this.config.heartbeat_in;
- this.client.heartbeat.outgoing = this.config.heartbeat_out;
- // Set function to debug print messages
- this.client.debug = this.config.debug || this.config.debug == null ? this.debug : null;
- }
- /**
- * Perform connection to STOMP broker, returning a Promise
- * which is resolved when connected.
- *
- * The CallBack function is used when the subscribed channel
- * send data that the fn should do.
- *
- * @param {(message: Stomp.Message) => any} callback
- * @returns {Promise<{}>}
- *
- * @memberOf STOMPService
- */
- public try_connect(callback: (message: Stomp.Message) => void): Promise<{}> {
- if (this.state !== STOMPState.CLOSED) {
- throw Error('Can\'t try_connect if not CLOSED!');
- }
- if (this.client === null) {
- throw Error('Client not configured!');
- }
- this.connectCallBack = callback;
- // Attempt connection, passing in a callback
- this.client.connect(
- this.config.user,
- this.config.pass,
- this.on_connect,
- this.on_error
- );
- console.log('Connecting...');
- this.state = STOMPState.TRYING;
- return new Promise(
- (resolve, reject) => this.resolvePromise = resolve
- );
- }
- /** Disconnect the STOMP client and clean up */
- public disconnect(message?: string): void {
- // Notify observers that we are disconnecting!
- this.state = STOMPState.DISCONNECTING;
- // Disconnect. Callback will set CLOSED state
- if (this.client) {
- this.client.disconnect(
- () => this.state = STOMPState.CLOSED,
- message
- );
- }
- }
- /** Send a message to all topics */
- public publish(message: string, publish:string[]): void {
- for (let t of publish) {
- this.client.send(t, {}, message);
- }
- }
- /** Subscribe to server message queues */
- public subscribe(): void {
- // Subscribe to our configured queues
- for (let t of this.config.subscribe) {
- this.client.subscribe(t, this.connectCallBack, { ack: 'auto' });
- }
- // Update the state
- if (this.config.subscribe.length > 0) {
- this.state = STOMPState.SUBSCRIBED;
- }
- }
- /**
- * Callback Functions
- *
- * Note the method signature: () => preserves lexical scope
- * if we need to use this.x inside the function
- */
- public debug(...args: any[]): void {
- // Push arguments to this function into console.log
- if (console.log && console.log.apply) {
- console.log.apply(console, args);
- }
- }
- // Callback run on successfully connecting to server
- public on_connect = () => {
- console.log('Connected');
- // Indicate our connected state to observers
- this.state = STOMPState.CONNECTED;
- // Subscribe to message queues
- this.subscribe();
- // Resolve our Promise to the caller
- this.resolvePromise();
- // Clear callback
- this.resolvePromise = null;
- }
- // Handle errors from stomp.js
- public on_error = (error: string) => {
- console.error('Error: ' + error);
- // Check for dropped connection and try reconnecting
- if (error.indexOf('Lost connection') !== -1) {
- // Reset state indicator
- this.state = STOMPState.CLOSED;
- // Attempt reconnection
- console.log('Reconnecting in 5 seconds...');
- setTimeout(() => {
- this.configure();
- this.try_connect(this.connectCallBack);
- }, 5000);
- }
- }
- // On message RX, notify the Observable with the message object
- public on_message = (message: Stomp.Message) => {
- if (message.body) {
- this.messages = message;
- } else {
- console.error('Empty message received!');
- }
- }
- }
- /**
- * Represents a configuration object for the
- * STOMPService to connect to, pub, and sub.
- */
- export interface StompConfig {
- // Which server?
- host: string;
- port: number;
- ssl: boolean;
- // What credentials?
- user: string;
- pass: string;
- // Which queues?
- publish: string[];
- subscribe: string[];
- // How often to heartbeat?
- heartbeat_in?: number;
- heartbeat_out?: number;
- // Enable client debugging?
- debug: boolean;
- };
- /** possible states for the STOMP service */
- export enum STOMPState {
- CLOSED,
- TRYING,
- CONNECTED,
- SUBSCRIBED,
- DISCONNECTING
- };
- /** look up states for the STOMP service */
- export const StateLookup: string[] = [
- 'CLOSED',
- 'TRYING',
- 'CONNECTED',
- 'SUBSCRIBED',
- 'DISCONNECTING'
- ];
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement