Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require 'stan/client'
- require 'monitor'
- require 'thread'
- class MinimalTest
- def initialize
- @queue = Queue.new
- @subscriptions = []
- end
- def subscribe_to_stan(durable)
- @subscriptions << stan.subscribe('test.nats', queue: 'goldstar', durable_name: durable) do |payload|
- STDOUT.puts("got test.nats #{payload} #{durable}")
- end
- end
- def publish_to_stan
- @count ||= 0
- @count += 1
- STDOUT.puts("publishing")
- stan.publish('test.nats', @count.to_s)
- end
- def unsubscriber
- STDOUT.puts("setting up unsubscriber")
- stan.nats.subscribe("unsubscribe_all.goldstar") do
- STDOUT.puts("unsubscribing")
- @queue.push(@subscription)
- end
- end
- def execute
- loop do
- publish_to_stan
- sleep 2
- end
- end
- def unsubscribe_thread
- STDOUT.puts('running thread')
- Thread.new do
- subscription = @queue.pop
- subscription.close
- end
- end
- def stan
- return @stan if @stan
- @stan = STAN::Client.new.tap do |client|
- client.connect('gse_cluster', 'stan_test', nats: {
- servers: ['nats://0.0.0.0:4222']
- })
- client.nats.on_error do |e|
- STDOUT.puts("NATS ERROR #{e}")
- STDOUT.puts(e.backtrace)
- end
- at_exit do
- client.close
- end
- end
- end
- end
- MinimalTest.new.tap do |m|
- m.unsubscriber
- m.unsubscribe_thread
- m.subscribe_to_stan('durable1')
- m.execute
- end
Add Comment
Please, Sign In to add comment