Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #==============================================================================
- # ** Drago - Replicated Fiber
- # Version : 1.01
- # Author : LiTTleDRAgo
- #==============================================================================
- ($imported ||= {})[:drg_replicated_fiber] = 1.01
- #==============================================================================
- # ** Fiber
- #------------------------------------------------------------------------------
- #
- #==============================================================================
- unless defined?(Fiber)
- class Fiber
- #-------------------------------------------------------------------------
- # * New method: initialize
- #-------------------------------------------------------------------------
- def initialize
- block_given? || raise(ArgumentError,'new Fiber requires a block')
- @yield = Queue.new
- @resume = Queue.new
- @thread = Thread.new { @yield.push [yield(*@resume.pop)] }
- @thread.abort_on_exception = true
- @thread[:fiber] = self
- end
- #-------------------------------------------------------------------------
- # * New method: self.current
- #-------------------------------------------------------------------------
- def self.current
- Thread.current[:fiber] || raise(FiberError,'not inside a fiber',caller(1))
- end
- #-------------------------------------------------------------------------
- # * New method: self.yield
- #-------------------------------------------------------------------------
- def self.yield(*args)
- (fiber = Thread.current[:fiber]) || (raise "can't yield from root fiber")
- fiber.yield(*args)
- end
- #-------------------------------------------------------------------------
- # * New method: inspect
- #-------------------------------------------------------------------------
- def inspect
- "#<#{self.class}:0x#{self.object_id.to_s(16)}>"
- end
- #-------------------------------------------------------------------------
- # * New method: resume
- #-------------------------------------------------------------------------
- def resume(*args)
- alive? || raise(FiberError,'dead fiber called',caller(1))
- @resume.push(args)
- result = @yield.pop
- result.size > 1 ? result : result.first
- end
- #-------------------------------------------------------------------------
- # * New method: alive?
- #-------------------------------------------------------------------------
- def alive?
- @thread.alive?
- end
- #-------------------------------------------------------------------------
- # * New method: yield
- #-------------------------------------------------------------------------
- def yield(*args)
- @yield.push(args)
- result = @resume.pop
- result.size > 1 ? result : result.first
- end
- end
- end
- #==============================================================================
- # ** Queue
- #------------------------------------------------------------------------------
- #
- #==============================================================================
- unless defined?(Queue)
- class Queue
- #-------------------------------------------------------------------------
- # * New method: initialize
- #-------------------------------------------------------------------------
- def initialize
- (@que ||= []).taint
- (@waiting ||= []).taint # enable tainted comunication
- self.taint
- end
- #-------------------------------------------------------------------------
- # * New method: clear
- #-------------------------------------------------------------------------
- def clear
- @que.clear
- end
- #-------------------------------------------------------------------------
- # * New method: empty?
- #-------------------------------------------------------------------------
- def empty?
- @que.empty?
- end
- #-------------------------------------------------------------------------
- # * New method: length
- #-------------------------------------------------------------------------
- def length
- @que.length
- end
- #-------------------------------------------------------------------------
- # * New method: num_waiting
- #-------------------------------------------------------------------------
- def num_waiting
- @waiting.size
- end
- #-------------------------------------------------------------------------
- # * New method: pop
- #-------------------------------------------------------------------------
- def pop(non_block=false)
- while (Thread.critical = true; @que.empty?)
- non_block && raise(ThreadError,"queue empty",caller(1))
- @waiting.push(Thread.current)
- Thread.stop
- end
- @que.shift
- ensure
- Thread.critical = false
- end
- #-------------------------------------------------------------------------
- # * New method: push
- #-------------------------------------------------------------------------
- def push(obj)
- Thread.critical = true
- @que.push obj
- begin
- t = @waiting.shift
- t && t.wakeup
- rescue ThreadError
- retry
- ensure
- Thread.critical = false
- end
- begin
- t && t.run
- rescue ThreadError
- end
- end
- #-------------------------------------------------------------------------
- # * Alias Listing
- #-------------------------------------------------------------------------
- method_defined?(:size) || (alias_method :size, :length)
- method_defined?(:shift) || (alias_method :shift, :pop)
- method_defined?(:deq) || (alias_method :deq, :pop)
- method_defined?(:"<<") || (alias_method :"<<", :push)
- method_defined?(:enq) || (alias_method :enq, :push)
- end
- end
- #==============================================================================
- # ** FiberError
- #------------------------------------------------------------------------------
- #
- #==============================================================================
- defined?(FiberError) || (class FiberError < StandardError ; end)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement