Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- From e3e6aa7578c94e0db043bdd99441bb017140f8c1 Mon Sep 17 00:00:00 2001
- From: "Michael S. Klishin" <michael@novemberain.com>
- Date: Tue, 21 Jun 2011 23:22:03 +0400
- Subject: [PATCH] Switch to AMQP::IntAllocator for channel id allocation,
- closes #83
- For very long running apps that open and close channels aggressively, previous implementation
- that used monotonically growing number did not work very well. IntAllocator takes released
- identifiers into account (we currently release when channel is closed and on TCP connection
- interruption).
- It is a loose port of com.rabbitmq.utilities.IntAllocator on top of
- java.util.BitSet. Not as efficient as the RabbitMQ Java client implementation
- but is good enough for most cases.
- As soon as we implement AMQP::BitSet#next_set_bit (in Java implementation it
- relies on java.lang.Long.numberOfTrailingZeros), Java client implementation
- can be ported basically 1-to-1.
- Squashed commit of the following:
- commit 9466f7fe81c2371298e2833f15c44f588e27339b
- Author: Michael S. Klishin <michael@novemberain.com>
- Date: Tue Jun 21 23:21:39 2011 +0400
- Finished AMQP::IntAllocator
- commit 0b44b6fbe29aad91b01a181b871dadb15afc2164
- Merge: 219bfb1 7273d9d
- Author: Michael S. Klishin <michael@novemberain.com>
- Date: Tue Jun 21 19:43:08 2011 +0400
- Merge branch 'master' into smater_channel_id_allocator
- commit 219bfb101d2a244a22eb9720a3d7a745447490fd
- Author: Michael S. Klishin <michael@novemberain.com>
- Date: Tue Jun 21 19:42:20 2011 +0400
- Looks like >>> can be imitated by simply combining >> and Fixnum#abs, Clojure takes this approach
- commit 1a9d6e10595a7d5ad3cdb922844eb68693c25c5a
- Author: Michael S. Klishin <michael@novemberain.com>
- Date: Tue Jun 21 18:29:25 2011 +0400
- More bit twiddling porting. Oh Ruby, why don't you have unsigned right shift operator?
- commit b4d2a1b6a615e51dcf8d023e6e1b0dac36086eae
- Author: Michael S. Klishin <michael@novemberain.com>
- Date: Tue Jun 21 16:45:21 2011 +0400
- BitSet is a WIP
- commit bac25c83ebd23a75a21b58b8cef07e2becd9272f
- Author: Michael S. Klishin <michael@novemberain.com>
- Date: Tue Jun 21 16:44:47 2011 +0400
- Add example that demonstrates round robin using default exchange
- And does not suffer from "one consumer per queue" amqp gem (historical) limitation
- commit 792d7abd66877ac8e62ed440c3f8b5932fa33df2
- Author: Michael S. Klishin <michael@novemberain.com>
- Date: Tue Jun 21 04:07:10 2011 +0400
- Initial bits of a smarter channel allocator (ported from the RabbitMQ Java client and java.util.BitSet). References #83.
- commit 67af9368e29d611c8870d32d54539f4b18494cfd
- Author: Michael S. Klishin <michael@novemberain.com>
- Date: Mon Jun 20 23:34:23 2011 +0400
- Semantics of AMQP.channel has changed
- ---
- lib/amqp/bit_set.rb | 78 ++++++++++++++++
- lib/amqp/channel.rb | 52 ++++++++++-
- lib/amqp/int_allocator.rb | 94 +++++++++++++++++++
- spec/unit/amqp/basic_spec.rb | 39 --------
- spec/unit/amqp/bit_set_spec.rb | 127 ++++++++++++++++++++++++++
- spec/unit/amqp/channel_id_allocation_spec.rb | 38 ++++++++
- spec/unit/amqp/int_allocator_spec.rb | 116 +++++++++++++++++++++++
- 7 files changed, 500 insertions(+), 44 deletions(-)
- create mode 100644 lib/amqp/bit_set.rb
- create mode 100644 lib/amqp/int_allocator.rb
- delete mode 100644 spec/unit/amqp/basic_spec.rb
- create mode 100644 spec/unit/amqp/bit_set_spec.rb
- create mode 100644 spec/unit/amqp/channel_id_allocation_spec.rb
- create mode 100644 spec/unit/amqp/int_allocator_spec.rb
- diff --git a/lib/amqp/bit_set.rb b/lib/amqp/bit_set.rb
- new file mode 100644
- index 0000000..a9268d4
- --- /dev/null
- +++ b/lib/amqp/bit_set.rb
- @@ -0,0 +1,78 @@
- +module AMQP
- + # Very minimalistic, pure Ruby implementation of bit set. Inspired by java.util.BitSet,
- + # although significantly smaller in scope.
- + class BitSet
- +
- + #
- + # API
- + #
- +
- + ADDRESS_BITS_PER_WORD = 6
- + BITS_PER_WORD = (1 << ADDRESS_BITS_PER_WORD)
- + WORD_MASK = 0xffffffffffffffff
- +
- + # @param [Integer] Number of bits in the set
- + # @api public
- + def initialize(nbits)
- + @nbits = nbits
- +
- + self.init_words(nbits)
- + end # initialize(nbits)
- +
- + # Sets (flags) given bit. This method allows bits to be set more than once in a row, no exception will be raised.
- + #
- + # @param [Integer] A bit to set
- + # @api public
- + def set(i)
- + w = self.word_index(i)
- + @words[w] |= (1 << i)
- + end # set(i)
- +
- + # Fetches flag value for given bit.
- + #
- + # @param [Integer] A bit to fetch
- + # @return [Boolean] true if given bit is set, false otherwise
- + # @api public
- + def get(i)
- + w = self.word_index(i)
- +
- + (@words[w] & (1 << i)) != 0
- + end # get(i)
- + alias [] get
- +
- + # Unsets (unflags) given bit. This method allows bits to be unset more than once in a row, no exception will be raised.
- + #
- + # @param [Integer] A bit to unset
- + # @api public
- + def unset(i)
- + w = self.word_index(i)
- + return if w.nil?
- +
- + @words[w] &= ~(1 << i)
- + end # unset(i)
- +
- + # Clears all bits in the set
- + # @api public
- + def clear
- + self.init_words(@nbits)
- + end # clear
- +
- +
- + #
- + # Implementation
- + #
- +
- + protected
- +
- + # @private
- + def init_words(nbits)
- + n = word_index(nbits-1) + 1
- + @words = Array.new(n) { 1 }
- + end # init_words
- +
- + # @private
- + def word_index(i)
- + i >> ADDRESS_BITS_PER_WORD
- + end # word_index(i)
- + end # BitSet
- +end # AMQP
- diff --git a/lib/amqp/channel.rb b/lib/amqp/channel.rb
- index a0db381..3ab85f4 100644
- --- a/lib/amqp/channel.rb
- +++ b/lib/amqp/channel.rb
- @@ -1,5 +1,6 @@
- # encoding: utf-8
- +require "amqp/int_allocator"
- require "amqp/exchange"
- require "amqp/queue"
- @@ -874,7 +875,10 @@ module AMQP
- #
- # @api public
- def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
- - super(reply_code, reply_text, class_id, method_id, &block)
- + r = super(reply_code, reply_text, class_id, method_id, &block)
- + self.class.release_channel_id(@id)
- +
- + r
- end
- # @endgroup
- @@ -1094,6 +1098,7 @@ module AMQP
- # @private
- def handle_connection_interruption(exception = nil)
- super(exception)
- + self.class.release_channel_id(@id)
- @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
- end
- @@ -1104,18 +1109,55 @@ module AMQP
- @channel_id_mutex ||= Mutex.new
- end
- - # Returns incrementing channel id. This method is thread safe.
- + # Returns next available channel id. This method is thread safe.
- + #
- # @return [Fixnum]
- # @api public
- + # @see Channel.release_channel_id
- + # @see Channel.reset_channel_id_allocator
- def self.next_channel_id
- channel_id_mutex.synchronize do
- - @last_channel_id ||= 0
- - @last_channel_id += 1
- + self.initialize_channel_id_allocator
- - @last_channel_id
- + @int_allocator.allocate
- end
- end
- + # Releases previously allocated channel id. This method is thread safe.
- + #
- + # @param [Fixnum] Channel id to release
- + # @api public
- + # @see Channel.next_channel_id
- + # @see Channel.reset_channel_id_allocator
- + def self.release_channel_id(i)
- + channel_id_mutex.synchronize do
- + self.initialize_channel_id_allocator
- +
- + @int_allocator.release(i)
- + end
- + end # self.release_channel_id(i)
- +
- + # Resets channel allocator. This method is thread safe.
- + # @api public
- + # @see Channel.next_channel_id
- + # @see Channel.release_channel_id
- + def self.reset_channel_id_allocator
- + channel_id_mutex.synchronize do
- + initialize_channel_id_allocator
- +
- + @int_allocator.reset
- + end
- + end # self.reset_channel_id_allocator
- +
- +
- + # @private
- + def self.initialize_channel_id_allocator
- + # TODO: ideally, this should be in agreement with agreed max number of channels of the connection,
- + # but it is possible that value either not yet available. MK.
- + max_channel = (1 << 16) - 1
- + @int_allocator ||= IntAllocator.new(1, max_channel)
- + end # self.initialize_channel_id_allocator
- +
- # @private
- # @api plugin
- def register_rpc(rpc)
- diff --git a/lib/amqp/int_allocator.rb b/lib/amqp/int_allocator.rb
- new file mode 100644
- index 0000000..08c314a
- --- /dev/null
- +++ b/lib/amqp/int_allocator.rb
- @@ -0,0 +1,94 @@
- +require "amqp/bit_set"
- +
- +module AMQP
- + # Simple bitset-based integer allocator, heavily inspired by com.rabbitmq.utility.IntAllocator class
- + # in the RabbitMQ Java client.
- + #
- + # Unlike monotonically incrementing identifier, this allocator is suitable for very long running programs
- + # that aggressively allocate and release channels.
- + class IntAllocator
- +
- + #
- + # API
- + #
- +
- + # @return [Integer] Number of integers in the allocation range
- + attr_reader :number_of_bits
- + # @return [Integer] Upper boundary of the integer range available for allocation
- + attr_reader :hi
- + # @return [Integer] Lower boundary of the integer range available for allocation
- + attr_reader :lo
- +
- + # @param [Integer] lo Lower boundary of the integer range available for allocation
- + # @param [Integer] hi Upper boundary of the integer range available for allocation
- + # @raise [ArgumentError] if upper boundary is not greater than the lower one
- + def initialize(lo, hi)
- + raise ArgumentError.new "upper boundary must be greater than the lower one (given: hi = #{hi}, lo = #{lo})" unless hi > lo
- +
- + @hi = hi
- + @lo = lo
- +
- + @number_of_bits = hi - lo
- + @range = Range.new(1, @number_of_bits)
- + @free_set = BitSet.new(@number_of_bits)
- + end # initialize(hi, lo)
- +
- + # Attempts to allocate next available integer. If allocation succeeds, allocated value is returned.
- + # Otherwise, nil is returned.
- + #
- + # Current implementation of this method is O(n), where n is number of bits in the range available for
- + # allocation.
- + #
- + # @return [Integer] Allocated integer if allocation succeeded. nil otherwise.
- + def allocate
- + if n = find_unallocated_position
- + @free_set.set(n)
- +
- + n
- + else
- + -1
- + end
- + end # allocate
- +
- + # Releases previously allocated integer. If integer provided as argument was not previously allocated,
- + # this method has no effect.
- + #
- + # @return [NilClass] nil
- + def free(reservation)
- + @free_set.unset(reservation)
- + end # free(reservation)
- + alias release free
- +
- + # @return [Boolean] true if provided argument was previously allocated, false otherwise
- + def allocated?(reservation)
- + @free_set.get(reservation)
- + end # allocated?(reservation)
- +
- + # Releases the whole allocation range
- + def reset
- + @free_set.clear
- + end # reset
- +
- +
- +
- + protected
- +
- + # This implementation is significantly less efficient
- + # that what the RabbitMQ Java client has (based on java.lang.Long#nextSetBit and
- + # java.lang.Long.numberOfTrailingZeros, and thus binary search over bits).
- + # But for channel id generation, this is a good enough implementation.
- + #
- + # @private
- + def find_unallocated_position
- + r = nil
- + @range.each do |i|
- + if !@free_set.get(i)
- + r = i
- + break;
- + end
- + end
- +
- + r
- + end # find_unallocated_position
- + end # IntAllocator
- +end # AMQP
- diff --git a/spec/unit/amqp/basic_spec.rb b/spec/unit/amqp/basic_spec.rb
- deleted file mode 100644
- index 0822cdc..0000000
- --- a/spec/unit/amqp/basic_spec.rb
- +++ /dev/null
- @@ -1,39 +0,0 @@
- -# encoding: utf-8
- -
- -require 'spec_helper'
- -
- -describe AMQP do
- -
- - #
- - # Environment
- - #
- -
- - include EventedSpec::AMQPSpec
- -
- - default_timeout 5
- -
- - amqp_before do
- - @channel = AMQP::Channel.new
- - end
- -
- -
- - #
- - # Examples
- - #
- -
- -
- - describe ".channel" do
- - it 'gives each thread a separate channel' do
- - pending 'This is not implemented in current lib'
- - module AMQP
- - @@cur_channel = 0
- - end
- -
- - described_class.channel.should == 1
- -
- - Thread.new { described_class.channel }.value.should == 2
- - Thread.new { described_class.channel }.value.should == 3
- - done
- - end
- - end
- -end # describe AMQP
- diff --git a/spec/unit/amqp/bit_set_spec.rb b/spec/unit/amqp/bit_set_spec.rb
- new file mode 100644
- index 0000000..033ee52
- --- /dev/null
- +++ b/spec/unit/amqp/bit_set_spec.rb
- @@ -0,0 +1,127 @@
- +# encoding: utf-8
- +
- +require 'spec_helper'
- +
- +require "amqp/bit_set"
- +
- +
- +describe AMQP::BitSet do
- +
- + #
- + # Environment
- + #
- +
- + let(:nbits) { (1 << 16) - 1 }
- +
- +
- + #
- + # Examples
- + #
- +
- + describe "#get, #[]" do
- + describe "when bit at given position is set" do
- + subject do
- + o = described_class.new(nbits)
- + o.set(3)
- + o
- + end
- +
- + it "returns true" do
- + subject.get(3).should be_true
- + end # it
- + end # describe
- +
- + describe "when bit at given position is off" do
- + subject do
- + described_class.new(nbits)
- + end
- +
- + it "returns false" do
- + subject.get(5).should be_false
- + end # it
- + end # describe
- + end # describe
- +
- +
- + describe "#set" do
- + describe "when bit at given position is set" do
- + subject do
- + described_class.new(nbits)
- + end
- +
- + it "has no effect" do
- + subject.set(3)
- + subject.get(3).should be_true
- + subject.set(3)
- + subject[3].should be_true
- + end # it
- + end
- +
- + describe "when bit at given position is off" do
- + subject do
- + described_class.new(nbits)
- + end
- +
- + it "sets that bit" do
- + subject.set(3)
- + subject.get(3).should be_true
- +
- + subject.set(33)
- + subject.get(33).should be_true
- +
- + subject.set(3387)
- + subject.get(3387).should be_true
- + end
- + end # describe
- + end # describe
- +
- +
- + describe "#unset" do
- + describe "when bit at a given position is set" do
- + subject do
- + described_class.new(nbits)
- + end
- +
- + it "unsets that bit" do
- + subject.set(3)
- + subject.get(3).should be_true
- + subject.unset(3)
- + subject.get(3).should be_false
- + end # it
- + end # describe
- +
- +
- + describe "when bit at a given position is off" do
- + subject do
- + described_class.new(nbits)
- + end
- +
- + it "has no effect" do
- + subject.get(3).should be_false
- + subject.unset(3)
- + subject.get(3).should be_false
- + end # it
- + end # describe
- + end # describe
- +
- +
- +
- + describe "#clear" do
- + subject do
- + described_class.new(nbits)
- + end
- +
- + it "clears all bits" do
- + subject.set(3)
- + subject.get(3).should be_true
- +
- + subject.set(7668)
- + subject.get(7668).should be_true
- +
- + subject.clear
- +
- + subject.get(3).should be_false
- + subject.get(7668).should be_false
- + end
- + end
- +end
- diff --git a/spec/unit/amqp/channel_id_allocation_spec.rb b/spec/unit/amqp/channel_id_allocation_spec.rb
- new file mode 100644
- index 0000000..4148482
- --- /dev/null
- +++ b/spec/unit/amqp/channel_id_allocation_spec.rb
- @@ -0,0 +1,38 @@
- +require "spec_helper"
- +
- +describe AMQP::Channel do
- + describe ".next_channel_id" do
- + before :all do
- + described_class.reset_channel_id_allocator
- + end
- +
- + context "when there is a channel id available for allocation" do
- + it "returns that channel id" do
- + 1024.times { described_class.next_channel_id }
- +
- + described_class.next_channel_id.should == 1025
- + end
- + end
- +
- + context "when THERE IS NOT channel id available for allocation" do
- + it "raises an exception"
- + end
- + end
- +
- +
- +
- + describe ".release_channel_id" do
- + before :all do
- + described_class.reset_channel_id_allocator
- + end
- +
- + it "releases that channel id" do
- + 1024.times { described_class.next_channel_id }
- + described_class.next_channel_id.should == 1025
- +
- + described_class.release_channel_id(128)
- + described_class.next_channel_id.should == 128
- + described_class.next_channel_id.should == 1026
- + end
- + end
- +end
- diff --git a/spec/unit/amqp/int_allocator_spec.rb b/spec/unit/amqp/int_allocator_spec.rb
- new file mode 100644
- index 0000000..c8c496e
- --- /dev/null
- +++ b/spec/unit/amqp/int_allocator_spec.rb
- @@ -0,0 +1,116 @@
- +# encoding: utf-8
- +
- +require 'spec_helper'
- +require "amqp/int_allocator"
- +
- +describe AMQP::IntAllocator do
- +
- + #
- + # Environment
- + #
- +
- + subject do
- + described_class.new(1, 5)
- + end
- +
- +
- + # ...
- +
- +
- + #
- + # Examples
- + #
- +
- + describe "#number_of_bits" do
- + it "returns number of bits available for allocation" do
- + subject.number_of_bits.should == 4
- + end
- + end
- +
- +
- + describe "#hi" do
- + it "returns upper bound of the allocation range" do
- + subject.hi.should == 5
- + end
- + end
- +
- + describe "#lo" do
- + it "returns lower bound of the allocation range" do
- + subject.lo.should == 1
- + end
- + end
- +
- +
- + describe "#allocate" do
- + context "when integer in the range is available" do
- + it "returns allocated integer" do
- + subject.allocate.should == 1
- + subject.allocate.should == 2
- + subject.allocate.should == 3
- + subject.allocate.should == 4
- +
- + subject.allocate.should == -1
- + end
- + end
- +
- + context "when integer in the range IS NOT available" do
- + it "returns -1" do
- + 4.times { subject.allocate }
- +
- + subject.allocate.should == -1
- + subject.allocate.should == -1
- + subject.allocate.should == -1
- + subject.allocate.should == -1
- + end
- + end
- + end
- +
- +
- + describe "#free" do
- + context "when the integer WAS allocated" do
- + it "returns frees that integer" do
- + 4.times { subject.allocate }
- + subject.allocate.should == -1
- +
- + subject.free(1)
- + subject.allocate.should == 1
- + subject.allocate.should == -1
- + subject.free(2)
- + subject.allocate.should == 2
- + subject.allocate.should == -1
- + subject.free(3)
- + subject.allocate.should == 3
- + subject.allocate.should == -1
- + end
- + end
- +
- + context "when the integer WAS NOT allocated" do
- + it "has no effect" do
- + 32.times { subject.free(1) }
- + subject.allocate.should == 1
- + end
- + end
- + end
- +
- +
- + describe "#allocated?" do
- + context "when given position WAS allocated" do
- + it "returns true" do
- + 3.times { subject.allocate }
- +
- + subject.allocated?(1).should be_true
- + subject.allocated?(2).should be_true
- + subject.allocated?(3).should be_true
- + end
- + end
- +
- + context "when given position WAS NOT allocated" do
- + it "returns false" do
- + 2.times { subject.allocate }
- +
- + subject.allocated?(3).should be_false
- + subject.allocated?(4).should be_false
- + end
- + end
- + end
- +end
- --
- 1.7.5.4
Add Comment
Please, Sign In to add comment