Advertisement
Guest User

Untitled

a guest
Feb 13th, 2017
155
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.59 KB | None | 0 0
  1. import * as Stomp from 'stompjs';
  2. import { EventEmitter } from 'events';
  3.  
  4. export class STOMPService {
  5. public state: STOMPState;
  6. public messages: Stomp.Message;
  7. public emitter: StompEmitter = new EventEmitter();
  8.  
  9. private config: StompConfig;
  10. private client: Stomp.Client;
  11. private resolvePromise: { (...args: any[]): void };
  12. private connectCallBack: (mes: Stomp.Message) => any;
  13.  
  14. public constructor() {
  15. this.state = STOMPState.CLOSED;
  16. }
  17.  
  18. /** Set up configuration */
  19. public configure(config?: StompConfig): void {
  20.  
  21. // Check for errors:
  22. if (this.state !== STOMPState.CLOSED) {
  23. throw Error('Already running!');
  24. }
  25. if (config === null && this.config === null) {
  26. throw Error('No configuration provided!');
  27. }
  28.  
  29. // Set our configuration
  30. if (config != null) {
  31. this.config = config;
  32. }
  33.  
  34. // Connecting via SSL Websocket?
  35. let scheme = 'ws';
  36. if (this.config.ssl) {
  37. scheme = 'wss';
  38. }
  39.  
  40. // Attempt connection, passing in a callback
  41. this.client = Stomp.overWS(`${scheme}://${this.config.host}:${this.config.port}/stomp/websocket`);
  42.  
  43. // Configure client heartbeating
  44. this.client.heartbeat.incoming = this.config.heartbeat_in;
  45. this.client.heartbeat.outgoing = this.config.heartbeat_out;
  46.  
  47. // Set function to debug print messages
  48. this.client.debug = this.config.debug || this.config.debug == null ? this.debug : null;
  49. }
  50.  
  51.  
  52. /**
  53. * Perform connection to STOMP broker, returning a Promise
  54. * which is resolved when connected.
  55. *
  56. * The CallBack function is used when the subscribed channel
  57. * send data that the fn should do.
  58. *
  59. * @param {(message: Stomp.Message) => any} callback
  60. * @returns {Promise<{}>}
  61. *
  62. * @memberOf STOMPService
  63. */
  64. public try_connect(callback?: (message: Stomp.Message) => void): Promise<{}> {
  65.  
  66. if (this.state !== STOMPState.CLOSED) {
  67. throw Error('Can\'t try_connect if not CLOSED!');
  68. }
  69. if (this.client === null) {
  70. throw Error('Client not configured!');
  71. }
  72.  
  73. if (!!callback)
  74. this.connectCallBack = callback;
  75.  
  76. // Attempt connection, passing in a callback
  77. this.client.connect(
  78. this.config.user,
  79. this.config.pass,
  80. this.on_connect,
  81. this.on_error
  82. );
  83.  
  84. console.log('Connecting...');
  85. this.state = STOMPState.TRYING;
  86.  
  87. return new Promise(
  88. (resolve, reject) => this.resolvePromise = resolve
  89. );
  90. }
  91.  
  92.  
  93. /** Disconnect the STOMP client and clean up */
  94. public disconnect(message?: string): void {
  95.  
  96. // Notify observers that we are disconnecting!
  97. this.state = STOMPState.DISCONNECTING;
  98.  
  99. // Disconnect. Callback will set CLOSED state
  100. if (this.client) {
  101. this.client.disconnect(
  102. () => this.state = STOMPState.CLOSED,
  103. message
  104. );
  105. }
  106. }
  107.  
  108.  
  109. /** Send a message to all topics */
  110. public publish(message: string, publish: string[]): void {
  111.  
  112. for (let t of publish) {
  113. this.client.send(t, {}, message);
  114. }
  115. }
  116.  
  117.  
  118. /** Subscribe to server message queues */
  119. public subscribe(): void {
  120.  
  121. // Subscribe to our configured queues
  122. for (let t of this.config.subscribe) {
  123. this.client.subscribe(t, this.on_message, { ack: 'auto' });
  124. }
  125.  
  126. // Update the state
  127. if (this.config.subscribe.length > 0) {
  128. this.state = STOMPState.SUBSCRIBED;
  129. }
  130. }
  131.  
  132. /**
  133. * When recieve the message
  134. */
  135. on_message = (msg: Stomp.Message) => {
  136. if (msg.body) {
  137. this.emitter.emit('message', msg.body);
  138. if (!!this.connectCallBack && typeof this.connectCallBack === 'function') {
  139. this.connectCallBack(msg);
  140. }
  141. } else {
  142. console.log('Recieve empty message!')
  143. }
  144. }
  145.  
  146.  
  147. /**
  148. * Callback Functions
  149. *
  150. * Note the method signature: () => preserves lexical scope
  151. * if we need to use this.x inside the function
  152. */
  153. public debug(...args: any[]): void {
  154.  
  155. // Push arguments to this function into console.log
  156. if (console.log && console.log.apply) {
  157. console.log.apply(console, args);
  158. }
  159. }
  160.  
  161.  
  162. // Callback run on successfully connecting to server
  163. public on_connect = () => {
  164. console.log('Connected');
  165. // Indicate our connected state to observers
  166. this.state = STOMPState.CONNECTED;
  167. // Subscribe to message queues
  168. this.subscribe();
  169. // Resolve our Promise to the caller
  170. this.resolvePromise();
  171. // Clear callback
  172. this.resolvePromise = null;
  173. }
  174.  
  175.  
  176. // Handle errors from stomp.js
  177. public on_error = (error: string) => {
  178. console.error('Error: ' + error);
  179. // Check for dropped connection and try reconnecting
  180. if (error.indexOf('Lost connection') !== -1) {
  181. // Reset state indicator
  182. this.state = STOMPState.CLOSED;
  183. // Attempt reconnection
  184. console.log('Reconnecting in 5 seconds...');
  185. setTimeout(() => {
  186. this.configure();
  187. this.try_connect(this.connectCallBack);
  188. }, 5000);
  189. }
  190. }
  191.  
  192. public messageEmitter = () => {
  193. return this.emitter;
  194. }
  195. }
  196.  
  197. /**
  198. * Represents a configuration object for the
  199. * STOMPService to connect to, pub, and sub.
  200. */
  201. export interface StompConfig {
  202. // Which server?
  203. host: string;
  204. port: number;
  205. ssl: boolean;
  206.  
  207. // What credentials?
  208. user: string;
  209. pass: string;
  210.  
  211. // Which queues?
  212. publish: string[];
  213. subscribe: string[];
  214.  
  215. // How often to heartbeat?
  216. heartbeat_in?: number;
  217. heartbeat_out?: number;
  218.  
  219. // Enable client debugging?
  220. debug: boolean;
  221. };
  222.  
  223.  
  224. /** possible states for the STOMP service */
  225. export enum STOMPState {
  226. CLOSED,
  227. TRYING,
  228. CONNECTED,
  229. SUBSCRIBED,
  230. DISCONNECTING
  231. };
  232.  
  233. /** look up states for the STOMP service */
  234. export const StateLookup: string[] = [
  235. 'CLOSED',
  236. 'TRYING',
  237. 'CONNECTED',
  238. 'SUBSCRIBED',
  239. 'DISCONNECTING'
  240. ];
  241.  
  242. declare interface StompEmitter extends EventEmitter {
  243. on(events: string, listener: Function)
  244. on(events: 'message', listener: (msg: Stomp.Message) => void);
  245. on(events: 'send', listener: (data: {} | string) => void);
  246.  
  247. emit(events: string | symbol, ...args: any[]);
  248. emit(events: 'send', data: { e: string, arg: any });
  249. emit(events: 'message', data: string);
  250. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement