Guest User

Untitled

a guest
Oct 1st, 2018
127
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.29 KB | None | 0 0
  1. import {
  2. Connection, Receiver, EventContext, ConnectionOptions, ReceiverOptions, delay, ReceiverEvents, types
  3. } from "../lib";
  4.  
  5. import * as dotenv from "dotenv"; // Optional for loading environment configuration from a .env (config) file
  6. dotenv.config();
  7.  
  8. const host = process.env.AMQP_HOST || "host";
  9. const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
  10. const password = process.env.AMQP_PASSWORD || "sharedAccessKeyValue";
  11. const port = parseInt(process.env.AMQP_PORT || "5671");
  12. const receiverAddress = process.env.RECEIVER_ADDRESS || "address";
  13.  
  14. async function main(): Promise<void> {
  15. const connectionOptions: ConnectionOptions = {
  16. transport: "tls",
  17. host: host,
  18. hostname: host,
  19. username: username,
  20. password: password,
  21. port: port,
  22. reconnect: false
  23. };
  24. const connection: Connection = new Connection(connectionOptions);
  25. const connection2: Connection = new Connection(connectionOptions);
  26. const receiverName = "receiver-1";
  27. // receive messages from the past one hour
  28. const filterClause = `amqp.annotation.x-opt-enqueued-time > '${Date.now() - 3600 * 1000}'`;
  29. const receiverOptions: ReceiverOptions = {
  30. name: receiverName,
  31. source: {
  32. address: receiverAddress,
  33. filter: {
  34. "apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468C00000004)
  35. }
  36. },
  37. onSessionError: (context: EventContext) => {
  38. const sessionError = context.session && context.session.error;
  39. if (sessionError) {
  40. console.log(">>>>> [%s] An error occurred for session of receiver '%s': %O.",
  41. context.connection.id, context.receiver!.name, sessionError);
  42. }
  43. }
  44. };
  45.  
  46. await connection.open();
  47. const receiver: Receiver = await connection.createReceiver(receiverOptions);
  48. receiver.on(ReceiverEvents.message, (context: EventContext) => {
  49. console.log("[%s] Receiver '%s' received message: %O", context.connection.id,
  50. context.receiver!.name, context.message);
  51. });
  52. receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
  53. const receiverError = context.receiver && context.receiver.error;
  54. if (receiverError) {
  55. console.log(">>>>> [%s] An error occurred for receiver '%s': %O.",
  56. context.connection.id, receiverName, receiverError);
  57. }
  58. });
  59.  
  60.  
  61. await connection2.open();
  62. const receiver2Options = Object.assign(receiverOptions);
  63. receiver2Options.name = "receiver-2";
  64. const receiver2: Receiver = await connection2.createReceiver(receiver2Options);
  65. receiver2.on(ReceiverEvents.message, (context: EventContext) => {
  66. console.log("[%s] Receiver '%s' received message: %O", context.connection.id,
  67. context.receiver!.name, context.message);
  68. });
  69. receiver2.on(ReceiverEvents.receiverError, (context: EventContext) => {
  70. const receiverError = context.receiver && context.receiver.error;
  71. if (receiverError) {
  72. console.log(">>>>> [%s] An error occurred for receiver '%s': %O.",
  73. context.connection.id, receiverName, receiverError);
  74. }
  75. });
  76.  
  77. console.log("Done creating both the receivers on separate connections..");
  78. // sleeping for 2 mins to let the receiver receive messages and then closing it.
  79. await delay(120000);
  80. await receiver.close();
  81. await receiver2.close();
  82. await connection.close();
  83. await connection2.close();
  84. }
  85.  
  86. main().catch((err) => console.log(err));
Add Comment
Please, Sign In to add comment