Guest User

Untitled

a guest
Aug 3rd, 2018
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 18.65 KB | None | 0 0
  1. From e3e6aa7578c94e0db043bdd99441bb017140f8c1 Mon Sep 17 00:00:00 2001
  2. From: "Michael S. Klishin" <michael@novemberain.com>
  3. Date: Tue, 21 Jun 2011 23:22:03 +0400
  4. Subject: [PATCH] Switch to AMQP::IntAllocator for channel id allocation,
  5. closes #83
  6.  
  7. For very long running apps that open and close channels aggressively, previous implementation
  8. that used monotonically growing number did not work very well. IntAllocator takes released
  9. identifiers into account (we currently release when channel is closed and on TCP connection
  10. interruption).
  11.  
  12. It is a loose port of com.rabbitmq.utilities.IntAllocator on top of
  13. java.util.BitSet. Not as efficient as the RabbitMQ Java client implementation
  14. but is good enough for most cases.
  15.  
  16. As soon as we implement AMQP::BitSet#next_set_bit (in Java implementation it
  17. relies on java.lang.Long.numberOfTrailingZeros), Java client implementation
  18. can be ported basically 1-to-1.
  19.  
  20. Squashed commit of the following:
  21.  
  22. commit 9466f7fe81c2371298e2833f15c44f588e27339b
  23. Author: Michael S. Klishin <michael@novemberain.com>
  24. Date: Tue Jun 21 23:21:39 2011 +0400
  25.  
  26. Finished AMQP::IntAllocator
  27.  
  28. commit 0b44b6fbe29aad91b01a181b871dadb15afc2164
  29. Merge: 219bfb1 7273d9d
  30. Author: Michael S. Klishin <michael@novemberain.com>
  31. Date: Tue Jun 21 19:43:08 2011 +0400
  32.  
  33. Merge branch 'master' into smater_channel_id_allocator
  34.  
  35. commit 219bfb101d2a244a22eb9720a3d7a745447490fd
  36. Author: Michael S. Klishin <michael@novemberain.com>
  37. Date: Tue Jun 21 19:42:20 2011 +0400
  38.  
  39. Looks like >>> can be imitated by simply combining >> and Fixnum#abs, Clojure takes this approach
  40.  
  41. commit 1a9d6e10595a7d5ad3cdb922844eb68693c25c5a
  42. Author: Michael S. Klishin <michael@novemberain.com>
  43. Date: Tue Jun 21 18:29:25 2011 +0400
  44.  
  45. More bit twiddling porting. Oh Ruby, why don't you have unsigned right shift operator?
  46.  
  47. commit b4d2a1b6a615e51dcf8d023e6e1b0dac36086eae
  48. Author: Michael S. Klishin <michael@novemberain.com>
  49. Date: Tue Jun 21 16:45:21 2011 +0400
  50.  
  51. BitSet is a WIP
  52.  
  53. commit bac25c83ebd23a75a21b58b8cef07e2becd9272f
  54. Author: Michael S. Klishin <michael@novemberain.com>
  55. Date: Tue Jun 21 16:44:47 2011 +0400
  56.  
  57. Add example that demonstrates round robin using default exchange
  58.  
  59. And does not suffer from "one consumer per queue" amqp gem (historical) limitation
  60.  
  61. commit 792d7abd66877ac8e62ed440c3f8b5932fa33df2
  62. Author: Michael S. Klishin <michael@novemberain.com>
  63. Date: Tue Jun 21 04:07:10 2011 +0400
  64.  
  65. Initial bits of a smarter channel allocator (ported from the RabbitMQ Java client and java.util.BitSet). References #83.
  66.  
  67. commit 67af9368e29d611c8870d32d54539f4b18494cfd
  68. Author: Michael S. Klishin <michael@novemberain.com>
  69. Date: Mon Jun 20 23:34:23 2011 +0400
  70.  
  71. Semantics of AMQP.channel has changed
  72. ---
  73. lib/amqp/bit_set.rb | 78 ++++++++++++++++
  74. lib/amqp/channel.rb | 52 ++++++++++-
  75. lib/amqp/int_allocator.rb | 94 +++++++++++++++++++
  76. spec/unit/amqp/basic_spec.rb | 39 --------
  77. spec/unit/amqp/bit_set_spec.rb | 127 ++++++++++++++++++++++++++
  78. spec/unit/amqp/channel_id_allocation_spec.rb | 38 ++++++++
  79. spec/unit/amqp/int_allocator_spec.rb | 116 +++++++++++++++++++++++
  80. 7 files changed, 500 insertions(+), 44 deletions(-)
  81. create mode 100644 lib/amqp/bit_set.rb
  82. create mode 100644 lib/amqp/int_allocator.rb
  83. delete mode 100644 spec/unit/amqp/basic_spec.rb
  84. create mode 100644 spec/unit/amqp/bit_set_spec.rb
  85. create mode 100644 spec/unit/amqp/channel_id_allocation_spec.rb
  86. create mode 100644 spec/unit/amqp/int_allocator_spec.rb
  87.  
  88. diff --git a/lib/amqp/bit_set.rb b/lib/amqp/bit_set.rb
  89. new file mode 100644
  90. index 0000000..a9268d4
  91. --- /dev/null
  92. +++ b/lib/amqp/bit_set.rb
  93. @@ -0,0 +1,78 @@
  94. +module AMQP
  95. + # Very minimalistic, pure Ruby implementation of bit set. Inspired by java.util.BitSet,
  96. + # although significantly smaller in scope.
  97. + class BitSet
  98. +
  99. + #
  100. + # API
  101. + #
  102. +
  103. + ADDRESS_BITS_PER_WORD = 6
  104. + BITS_PER_WORD = (1 << ADDRESS_BITS_PER_WORD)
  105. + WORD_MASK = 0xffffffffffffffff
  106. +
  107. + # @param [Integer] Number of bits in the set
  108. + # @api public
  109. + def initialize(nbits)
  110. + @nbits = nbits
  111. +
  112. + self.init_words(nbits)
  113. + end # initialize(nbits)
  114. +
  115. + # Sets (flags) given bit. This method allows bits to be set more than once in a row, no exception will be raised.
  116. + #
  117. + # @param [Integer] A bit to set
  118. + # @api public
  119. + def set(i)
  120. + w = self.word_index(i)
  121. + @words[w] |= (1 << i)
  122. + end # set(i)
  123. +
  124. + # Fetches flag value for given bit.
  125. + #
  126. + # @param [Integer] A bit to fetch
  127. + # @return [Boolean] true if given bit is set, false otherwise
  128. + # @api public
  129. + def get(i)
  130. + w = self.word_index(i)
  131. +
  132. + (@words[w] & (1 << i)) != 0
  133. + end # get(i)
  134. + alias [] get
  135. +
  136. + # Unsets (unflags) given bit. This method allows bits to be unset more than once in a row, no exception will be raised.
  137. + #
  138. + # @param [Integer] A bit to unset
  139. + # @api public
  140. + def unset(i)
  141. + w = self.word_index(i)
  142. + return if w.nil?
  143. +
  144. + @words[w] &= ~(1 << i)
  145. + end # unset(i)
  146. +
  147. + # Clears all bits in the set
  148. + # @api public
  149. + def clear
  150. + self.init_words(@nbits)
  151. + end # clear
  152. +
  153. +
  154. + #
  155. + # Implementation
  156. + #
  157. +
  158. + protected
  159. +
  160. + # @private
  161. + def init_words(nbits)
  162. + n = word_index(nbits-1) + 1
  163. + @words = Array.new(n) { 1 }
  164. + end # init_words
  165. +
  166. + # @private
  167. + def word_index(i)
  168. + i >> ADDRESS_BITS_PER_WORD
  169. + end # word_index(i)
  170. + end # BitSet
  171. +end # AMQP
  172. diff --git a/lib/amqp/channel.rb b/lib/amqp/channel.rb
  173. index a0db381..3ab85f4 100644
  174. --- a/lib/amqp/channel.rb
  175. +++ b/lib/amqp/channel.rb
  176. @@ -1,5 +1,6 @@
  177. # encoding: utf-8
  178.  
  179. +require "amqp/int_allocator"
  180. require "amqp/exchange"
  181. require "amqp/queue"
  182.  
  183. @@ -874,7 +875,10 @@ module AMQP
  184. #
  185. # @api public
  186. def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
  187. - super(reply_code, reply_text, class_id, method_id, &block)
  188. + r = super(reply_code, reply_text, class_id, method_id, &block)
  189. + self.class.release_channel_id(@id)
  190. +
  191. + r
  192. end
  193.  
  194. # @endgroup
  195. @@ -1094,6 +1098,7 @@ module AMQP
  196. # @private
  197. def handle_connection_interruption(exception = nil)
  198. super(exception)
  199. + self.class.release_channel_id(@id)
  200. @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
  201. end
  202.  
  203. @@ -1104,18 +1109,55 @@ module AMQP
  204. @channel_id_mutex ||= Mutex.new
  205. end
  206.  
  207. - # Returns incrementing channel id. This method is thread safe.
  208. + # Returns next available channel id. This method is thread safe.
  209. + #
  210. # @return [Fixnum]
  211. # @api public
  212. + # @see Channel.release_channel_id
  213. + # @see Channel.reset_channel_id_allocator
  214. def self.next_channel_id
  215. channel_id_mutex.synchronize do
  216. - @last_channel_id ||= 0
  217. - @last_channel_id += 1
  218. + self.initialize_channel_id_allocator
  219.  
  220. - @last_channel_id
  221. + @int_allocator.allocate
  222. end
  223. end
  224.  
  225. + # Releases previously allocated channel id. This method is thread safe.
  226. + #
  227. + # @param [Fixnum] Channel id to release
  228. + # @api public
  229. + # @see Channel.next_channel_id
  230. + # @see Channel.reset_channel_id_allocator
  231. + def self.release_channel_id(i)
  232. + channel_id_mutex.synchronize do
  233. + self.initialize_channel_id_allocator
  234. +
  235. + @int_allocator.release(i)
  236. + end
  237. + end # self.release_channel_id(i)
  238. +
  239. + # Resets channel allocator. This method is thread safe.
  240. + # @api public
  241. + # @see Channel.next_channel_id
  242. + # @see Channel.release_channel_id
  243. + def self.reset_channel_id_allocator
  244. + channel_id_mutex.synchronize do
  245. + initialize_channel_id_allocator
  246. +
  247. + @int_allocator.reset
  248. + end
  249. + end # self.reset_channel_id_allocator
  250. +
  251. +
  252. + # @private
  253. + def self.initialize_channel_id_allocator
  254. + # TODO: ideally, this should be in agreement with agreed max number of channels of the connection,
  255. + # but it is possible that value either not yet available. MK.
  256. + max_channel = (1 << 16) - 1
  257. + @int_allocator ||= IntAllocator.new(1, max_channel)
  258. + end # self.initialize_channel_id_allocator
  259. +
  260. # @private
  261. # @api plugin
  262. def register_rpc(rpc)
  263. diff --git a/lib/amqp/int_allocator.rb b/lib/amqp/int_allocator.rb
  264. new file mode 100644
  265. index 0000000..08c314a
  266. --- /dev/null
  267. +++ b/lib/amqp/int_allocator.rb
  268. @@ -0,0 +1,94 @@
  269. +require "amqp/bit_set"
  270. +
  271. +module AMQP
  272. + # Simple bitset-based integer allocator, heavily inspired by com.rabbitmq.utility.IntAllocator class
  273. + # in the RabbitMQ Java client.
  274. + #
  275. + # Unlike monotonically incrementing identifier, this allocator is suitable for very long running programs
  276. + # that aggressively allocate and release channels.
  277. + class IntAllocator
  278. +
  279. + #
  280. + # API
  281. + #
  282. +
  283. + # @return [Integer] Number of integers in the allocation range
  284. + attr_reader :number_of_bits
  285. + # @return [Integer] Upper boundary of the integer range available for allocation
  286. + attr_reader :hi
  287. + # @return [Integer] Lower boundary of the integer range available for allocation
  288. + attr_reader :lo
  289. +
  290. + # @param [Integer] lo Lower boundary of the integer range available for allocation
  291. + # @param [Integer] hi Upper boundary of the integer range available for allocation
  292. + # @raise [ArgumentError] if upper boundary is not greater than the lower one
  293. + def initialize(lo, hi)
  294. + raise ArgumentError.new "upper boundary must be greater than the lower one (given: hi = #{hi}, lo = #{lo})" unless hi > lo
  295. +
  296. + @hi = hi
  297. + @lo = lo
  298. +
  299. + @number_of_bits = hi - lo
  300. + @range = Range.new(1, @number_of_bits)
  301. + @free_set = BitSet.new(@number_of_bits)
  302. + end # initialize(hi, lo)
  303. +
  304. + # Attempts to allocate next available integer. If allocation succeeds, allocated value is returned.
  305. + # Otherwise, nil is returned.
  306. + #
  307. + # Current implementation of this method is O(n), where n is number of bits in the range available for
  308. + # allocation.
  309. + #
  310. + # @return [Integer] Allocated integer if allocation succeeded. nil otherwise.
  311. + def allocate
  312. + if n = find_unallocated_position
  313. + @free_set.set(n)
  314. +
  315. + n
  316. + else
  317. + -1
  318. + end
  319. + end # allocate
  320. +
  321. + # Releases previously allocated integer. If integer provided as argument was not previously allocated,
  322. + # this method has no effect.
  323. + #
  324. + # @return [NilClass] nil
  325. + def free(reservation)
  326. + @free_set.unset(reservation)
  327. + end # free(reservation)
  328. + alias release free
  329. +
  330. + # @return [Boolean] true if provided argument was previously allocated, false otherwise
  331. + def allocated?(reservation)
  332. + @free_set.get(reservation)
  333. + end # allocated?(reservation)
  334. +
  335. + # Releases the whole allocation range
  336. + def reset
  337. + @free_set.clear
  338. + end # reset
  339. +
  340. +
  341. +
  342. + protected
  343. +
  344. + # This implementation is significantly less efficient
  345. + # that what the RabbitMQ Java client has (based on java.lang.Long#nextSetBit and
  346. + # java.lang.Long.numberOfTrailingZeros, and thus binary search over bits).
  347. + # But for channel id generation, this is a good enough implementation.
  348. + #
  349. + # @private
  350. + def find_unallocated_position
  351. + r = nil
  352. + @range.each do |i|
  353. + if !@free_set.get(i)
  354. + r = i
  355. + break;
  356. + end
  357. + end
  358. +
  359. + r
  360. + end # find_unallocated_position
  361. + end # IntAllocator
  362. +end # AMQP
  363. diff --git a/spec/unit/amqp/basic_spec.rb b/spec/unit/amqp/basic_spec.rb
  364. deleted file mode 100644
  365. index 0822cdc..0000000
  366. --- a/spec/unit/amqp/basic_spec.rb
  367. +++ /dev/null
  368. @@ -1,39 +0,0 @@
  369. -# encoding: utf-8
  370. -
  371. -require 'spec_helper'
  372. -
  373. -describe AMQP do
  374. -
  375. - #
  376. - # Environment
  377. - #
  378. -
  379. - include EventedSpec::AMQPSpec
  380. -
  381. - default_timeout 5
  382. -
  383. - amqp_before do
  384. - @channel = AMQP::Channel.new
  385. - end
  386. -
  387. -
  388. - #
  389. - # Examples
  390. - #
  391. -
  392. -
  393. - describe ".channel" do
  394. - it 'gives each thread a separate channel' do
  395. - pending 'This is not implemented in current lib'
  396. - module AMQP
  397. - @@cur_channel = 0
  398. - end
  399. -
  400. - described_class.channel.should == 1
  401. -
  402. - Thread.new { described_class.channel }.value.should == 2
  403. - Thread.new { described_class.channel }.value.should == 3
  404. - done
  405. - end
  406. - end
  407. -end # describe AMQP
  408. diff --git a/spec/unit/amqp/bit_set_spec.rb b/spec/unit/amqp/bit_set_spec.rb
  409. new file mode 100644
  410. index 0000000..033ee52
  411. --- /dev/null
  412. +++ b/spec/unit/amqp/bit_set_spec.rb
  413. @@ -0,0 +1,127 @@
  414. +# encoding: utf-8
  415. +
  416. +require 'spec_helper'
  417. +
  418. +require "amqp/bit_set"
  419. +
  420. +
  421. +describe AMQP::BitSet do
  422. +
  423. + #
  424. + # Environment
  425. + #
  426. +
  427. + let(:nbits) { (1 << 16) - 1 }
  428. +
  429. +
  430. + #
  431. + # Examples
  432. + #
  433. +
  434. + describe "#get, #[]" do
  435. + describe "when bit at given position is set" do
  436. + subject do
  437. + o = described_class.new(nbits)
  438. + o.set(3)
  439. + o
  440. + end
  441. +
  442. + it "returns true" do
  443. + subject.get(3).should be_true
  444. + end # it
  445. + end # describe
  446. +
  447. + describe "when bit at given position is off" do
  448. + subject do
  449. + described_class.new(nbits)
  450. + end
  451. +
  452. + it "returns false" do
  453. + subject.get(5).should be_false
  454. + end # it
  455. + end # describe
  456. + end # describe
  457. +
  458. +
  459. + describe "#set" do
  460. + describe "when bit at given position is set" do
  461. + subject do
  462. + described_class.new(nbits)
  463. + end
  464. +
  465. + it "has no effect" do
  466. + subject.set(3)
  467. + subject.get(3).should be_true
  468. + subject.set(3)
  469. + subject[3].should be_true
  470. + end # it
  471. + end
  472. +
  473. + describe "when bit at given position is off" do
  474. + subject do
  475. + described_class.new(nbits)
  476. + end
  477. +
  478. + it "sets that bit" do
  479. + subject.set(3)
  480. + subject.get(3).should be_true
  481. +
  482. + subject.set(33)
  483. + subject.get(33).should be_true
  484. +
  485. + subject.set(3387)
  486. + subject.get(3387).should be_true
  487. + end
  488. + end # describe
  489. + end # describe
  490. +
  491. +
  492. + describe "#unset" do
  493. + describe "when bit at a given position is set" do
  494. + subject do
  495. + described_class.new(nbits)
  496. + end
  497. +
  498. + it "unsets that bit" do
  499. + subject.set(3)
  500. + subject.get(3).should be_true
  501. + subject.unset(3)
  502. + subject.get(3).should be_false
  503. + end # it
  504. + end # describe
  505. +
  506. +
  507. + describe "when bit at a given position is off" do
  508. + subject do
  509. + described_class.new(nbits)
  510. + end
  511. +
  512. + it "has no effect" do
  513. + subject.get(3).should be_false
  514. + subject.unset(3)
  515. + subject.get(3).should be_false
  516. + end # it
  517. + end # describe
  518. + end # describe
  519. +
  520. +
  521. +
  522. + describe "#clear" do
  523. + subject do
  524. + described_class.new(nbits)
  525. + end
  526. +
  527. + it "clears all bits" do
  528. + subject.set(3)
  529. + subject.get(3).should be_true
  530. +
  531. + subject.set(7668)
  532. + subject.get(7668).should be_true
  533. +
  534. + subject.clear
  535. +
  536. + subject.get(3).should be_false
  537. + subject.get(7668).should be_false
  538. + end
  539. + end
  540. +end
  541. diff --git a/spec/unit/amqp/channel_id_allocation_spec.rb b/spec/unit/amqp/channel_id_allocation_spec.rb
  542. new file mode 100644
  543. index 0000000..4148482
  544. --- /dev/null
  545. +++ b/spec/unit/amqp/channel_id_allocation_spec.rb
  546. @@ -0,0 +1,38 @@
  547. +require "spec_helper"
  548. +
  549. +describe AMQP::Channel do
  550. + describe ".next_channel_id" do
  551. + before :all do
  552. + described_class.reset_channel_id_allocator
  553. + end
  554. +
  555. + context "when there is a channel id available for allocation" do
  556. + it "returns that channel id" do
  557. + 1024.times { described_class.next_channel_id }
  558. +
  559. + described_class.next_channel_id.should == 1025
  560. + end
  561. + end
  562. +
  563. + context "when THERE IS NOT channel id available for allocation" do
  564. + it "raises an exception"
  565. + end
  566. + end
  567. +
  568. +
  569. +
  570. + describe ".release_channel_id" do
  571. + before :all do
  572. + described_class.reset_channel_id_allocator
  573. + end
  574. +
  575. + it "releases that channel id" do
  576. + 1024.times { described_class.next_channel_id }
  577. + described_class.next_channel_id.should == 1025
  578. +
  579. + described_class.release_channel_id(128)
  580. + described_class.next_channel_id.should == 128
  581. + described_class.next_channel_id.should == 1026
  582. + end
  583. + end
  584. +end
  585. diff --git a/spec/unit/amqp/int_allocator_spec.rb b/spec/unit/amqp/int_allocator_spec.rb
  586. new file mode 100644
  587. index 0000000..c8c496e
  588. --- /dev/null
  589. +++ b/spec/unit/amqp/int_allocator_spec.rb
  590. @@ -0,0 +1,116 @@
  591. +# encoding: utf-8
  592. +
  593. +require 'spec_helper'
  594. +require "amqp/int_allocator"
  595. +
  596. +describe AMQP::IntAllocator do
  597. +
  598. + #
  599. + # Environment
  600. + #
  601. +
  602. + subject do
  603. + described_class.new(1, 5)
  604. + end
  605. +
  606. +
  607. + # ...
  608. +
  609. +
  610. + #
  611. + # Examples
  612. + #
  613. +
  614. + describe "#number_of_bits" do
  615. + it "returns number of bits available for allocation" do
  616. + subject.number_of_bits.should == 4
  617. + end
  618. + end
  619. +
  620. +
  621. + describe "#hi" do
  622. + it "returns upper bound of the allocation range" do
  623. + subject.hi.should == 5
  624. + end
  625. + end
  626. +
  627. + describe "#lo" do
  628. + it "returns lower bound of the allocation range" do
  629. + subject.lo.should == 1
  630. + end
  631. + end
  632. +
  633. +
  634. + describe "#allocate" do
  635. + context "when integer in the range is available" do
  636. + it "returns allocated integer" do
  637. + subject.allocate.should == 1
  638. + subject.allocate.should == 2
  639. + subject.allocate.should == 3
  640. + subject.allocate.should == 4
  641. +
  642. + subject.allocate.should == -1
  643. + end
  644. + end
  645. +
  646. + context "when integer in the range IS NOT available" do
  647. + it "returns -1" do
  648. + 4.times { subject.allocate }
  649. +
  650. + subject.allocate.should == -1
  651. + subject.allocate.should == -1
  652. + subject.allocate.should == -1
  653. + subject.allocate.should == -1
  654. + end
  655. + end
  656. + end
  657. +
  658. +
  659. + describe "#free" do
  660. + context "when the integer WAS allocated" do
  661. + it "returns frees that integer" do
  662. + 4.times { subject.allocate }
  663. + subject.allocate.should == -1
  664. +
  665. + subject.free(1)
  666. + subject.allocate.should == 1
  667. + subject.allocate.should == -1
  668. + subject.free(2)
  669. + subject.allocate.should == 2
  670. + subject.allocate.should == -1
  671. + subject.free(3)
  672. + subject.allocate.should == 3
  673. + subject.allocate.should == -1
  674. + end
  675. + end
  676. +
  677. + context "when the integer WAS NOT allocated" do
  678. + it "has no effect" do
  679. + 32.times { subject.free(1) }
  680. + subject.allocate.should == 1
  681. + end
  682. + end
  683. + end
  684. +
  685. +
  686. + describe "#allocated?" do
  687. + context "when given position WAS allocated" do
  688. + it "returns true" do
  689. + 3.times { subject.allocate }
  690. +
  691. + subject.allocated?(1).should be_true
  692. + subject.allocated?(2).should be_true
  693. + subject.allocated?(3).should be_true
  694. + end
  695. + end
  696. +
  697. + context "when given position WAS NOT allocated" do
  698. + it "returns false" do
  699. + 2.times { subject.allocate }
  700. +
  701. + subject.allocated?(3).should be_false
  702. + subject.allocated?(4).should be_false
  703. + end
  704. + end
  705. + end
  706. +end
  707. --
  708. 1.7.5.4
Add Comment
Please, Sign In to add comment