Guest User

Untitled

a guest
Mar 22nd, 2018
100
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.38 KB | None | 0 0
  1. require 'stan/client'
  2. require 'monitor'
  3. require 'thread'
  4.  
  5. class MinimalTest
  6.  
  7. def initialize
  8. @queue = Queue.new
  9. @subscriptions = []
  10. end
  11.  
  12. def subscribe_to_stan(durable)
  13. @subscriptions << stan.subscribe('test.nats', queue: 'goldstar', durable_name: durable) do |payload|
  14. STDOUT.puts("got test.nats #{payload} #{durable}")
  15. end
  16. end
  17.  
  18. def publish_to_stan
  19. @count ||= 0
  20. @count += 1
  21. STDOUT.puts("publishing")
  22. stan.publish('test.nats', @count.to_s)
  23. end
  24.  
  25. def unsubscriber
  26. STDOUT.puts("setting up unsubscriber")
  27. stan.nats.subscribe("unsubscribe_all.goldstar") do
  28. STDOUT.puts("unsubscribing")
  29. @queue.push(@subscription)
  30. end
  31. end
  32.  
  33. def execute
  34. loop do
  35. publish_to_stan
  36. sleep 2
  37. end
  38. end
  39.  
  40. def unsubscribe_thread
  41. STDOUT.puts('running thread')
  42. Thread.new do
  43. subscription = @queue.pop
  44. subscription.close
  45. end
  46. end
  47.  
  48. def stan
  49. return @stan if @stan
  50.  
  51. @stan = STAN::Client.new.tap do |client|
  52. client.connect('gse_cluster', 'stan_test', nats: {
  53. servers: ['nats://0.0.0.0:4222']
  54. })
  55.  
  56. client.nats.on_error do |e|
  57. STDOUT.puts("NATS ERROR #{e}")
  58. STDOUT.puts(e.backtrace)
  59. end
  60.  
  61. at_exit do
  62. client.close
  63. end
  64. end
  65. end
  66. end
  67.  
  68. MinimalTest.new.tap do |m|
  69. m.unsubscriber
  70. m.unsubscribe_thread
  71. m.subscribe_to_stan('durable1')
  72. m.execute
  73. end
Add Comment
Please, Sign In to add comment