Advertisement
Guest User

Untitled

a guest
Dec 8th, 2016
120
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.91 KB | None | 0 0
  1. import * as Stomp from 'stompjs';
  2.  
  3. export class STOMPService {
  4. public state: STOMPState;
  5. public messages: Stomp.Message;
  6. private config: StompConfig;
  7. private client: Stomp.Client;
  8. private resolvePromise: { (...args: any[]): void };
  9. private connectCallBack: (mes: Stomp.Message) => any;
  10.  
  11. public constructor() {
  12. this.state = STOMPState.CLOSED;
  13. }
  14.  
  15. /** Set up configuration */
  16. public configure(config?: StompConfig): void {
  17.  
  18. // Check for errors:
  19. if (this.state !== STOMPState.CLOSED) {
  20. throw Error('Already running!');
  21. }
  22. if (config === null && this.config === null) {
  23. throw Error('No configuration provided!');
  24. }
  25.  
  26. // Set our configuration
  27. if (config != null) {
  28. this.config = config;
  29. }
  30.  
  31. // Connecting via SSL Websocket?
  32. let scheme = 'ws';
  33. if (this.config.ssl) {
  34. scheme = 'wss';
  35. }
  36.  
  37. // Attempt connection, passing in a callback
  38. this.client = Stomp.overWS(`${scheme}://${this.config.host}:${this.config.port}/stomp/websocket`);
  39.  
  40. // Configure client heartbeating
  41. this.client.heartbeat.incoming = this.config.heartbeat_in;
  42. this.client.heartbeat.outgoing = this.config.heartbeat_out;
  43.  
  44. // Set function to debug print messages
  45. this.client.debug = this.config.debug || this.config.debug == null ? this.debug : null;
  46. }
  47.  
  48.  
  49. /**
  50. * Perform connection to STOMP broker, returning a Promise
  51. * which is resolved when connected.
  52. *
  53. * The CallBack function is used when the subscribed channel
  54. * send data that the fn should do.
  55. *
  56. * @param {(message: Stomp.Message) => any} callback
  57. * @returns {Promise<{}>}
  58. *
  59. * @memberOf STOMPService
  60. */
  61. public try_connect(callback: (message: Stomp.Message) => void): Promise<{}> {
  62.  
  63. if (this.state !== STOMPState.CLOSED) {
  64. throw Error('Can\'t try_connect if not CLOSED!');
  65. }
  66. if (this.client === null) {
  67. throw Error('Client not configured!');
  68. }
  69.  
  70. this.connectCallBack = callback;
  71.  
  72. // Attempt connection, passing in a callback
  73. this.client.connect(
  74. this.config.user,
  75. this.config.pass,
  76. this.on_connect,
  77. this.on_error
  78. );
  79.  
  80. console.log('Connecting...');
  81. this.state = STOMPState.TRYING;
  82.  
  83. return new Promise(
  84. (resolve, reject) => this.resolvePromise = resolve
  85. );
  86. }
  87.  
  88.  
  89. /** Disconnect the STOMP client and clean up */
  90. public disconnect(message?: string): void {
  91.  
  92. // Notify observers that we are disconnecting!
  93. this.state = STOMPState.DISCONNECTING;
  94.  
  95. // Disconnect. Callback will set CLOSED state
  96. if (this.client) {
  97. this.client.disconnect(
  98. () => this.state = STOMPState.CLOSED,
  99. message
  100. );
  101. }
  102. }
  103.  
  104.  
  105. /** Send a message to all topics */
  106. public publish(message: string, publish:string[]): void {
  107.  
  108. for (let t of publish) {
  109. this.client.send(t, {}, message);
  110. }
  111. }
  112.  
  113.  
  114. /** Subscribe to server message queues */
  115. public subscribe(): void {
  116.  
  117. // Subscribe to our configured queues
  118. for (let t of this.config.subscribe) {
  119. this.client.subscribe(t, this.connectCallBack, { ack: 'auto' });
  120. }
  121.  
  122. // Update the state
  123. if (this.config.subscribe.length > 0) {
  124. this.state = STOMPState.SUBSCRIBED;
  125. }
  126. }
  127.  
  128.  
  129. /**
  130. * Callback Functions
  131. *
  132. * Note the method signature: () => preserves lexical scope
  133. * if we need to use this.x inside the function
  134. */
  135. public debug(...args: any[]): void {
  136.  
  137. // Push arguments to this function into console.log
  138. if (console.log && console.log.apply) {
  139. console.log.apply(console, args);
  140. }
  141. }
  142.  
  143.  
  144. // Callback run on successfully connecting to server
  145. public on_connect = () => {
  146. console.log('Connected');
  147. // Indicate our connected state to observers
  148. this.state = STOMPState.CONNECTED;
  149. // Subscribe to message queues
  150. this.subscribe();
  151. // Resolve our Promise to the caller
  152. this.resolvePromise();
  153. // Clear callback
  154. this.resolvePromise = null;
  155. }
  156.  
  157.  
  158. // Handle errors from stomp.js
  159. public on_error = (error: string) => {
  160. console.error('Error: ' + error);
  161. // Check for dropped connection and try reconnecting
  162. if (error.indexOf('Lost connection') !== -1) {
  163. // Reset state indicator
  164. this.state = STOMPState.CLOSED;
  165. // Attempt reconnection
  166. console.log('Reconnecting in 5 seconds...');
  167. setTimeout(() => {
  168. this.configure();
  169. this.try_connect(this.connectCallBack);
  170. }, 5000);
  171. }
  172. }
  173.  
  174. // On message RX, notify the Observable with the message object
  175. public on_message = (message: Stomp.Message) => {
  176. if (message.body) {
  177. this.messages = message;
  178. } else {
  179. console.error('Empty message received!');
  180. }
  181. }
  182. }
  183.  
  184. /**
  185. * Represents a configuration object for the
  186. * STOMPService to connect to, pub, and sub.
  187. */
  188. export interface StompConfig {
  189. // Which server?
  190. host: string;
  191. port: number;
  192. ssl: boolean;
  193.  
  194. // What credentials?
  195. user: string;
  196. pass: string;
  197.  
  198. // Which queues?
  199. publish: string[];
  200. subscribe: string[];
  201.  
  202. // How often to heartbeat?
  203. heartbeat_in?: number;
  204. heartbeat_out?: number;
  205.  
  206. // Enable client debugging?
  207. debug: boolean;
  208. };
  209.  
  210.  
  211. /** possible states for the STOMP service */
  212. export enum STOMPState {
  213. CLOSED,
  214. TRYING,
  215. CONNECTED,
  216. SUBSCRIBED,
  217. DISCONNECTING
  218. };
  219.  
  220. /** look up states for the STOMP service */
  221. export const StateLookup: string[] = [
  222. 'CLOSED',
  223. 'TRYING',
  224. 'CONNECTED',
  225. 'SUBSCRIBED',
  226. 'DISCONNECTING'
  227. ];
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement