#============================================================================== # ** Drago - Replicated Fiber # Version : 1.01 # Author : LiTTleDRAgo #============================================================================== ($imported ||= {})[:drg_replicated_fiber] = 1.01 #============================================================================== # ** Fiber #------------------------------------------------------------------------------ # #============================================================================== unless defined?(Fiber) class Fiber #------------------------------------------------------------------------- # * New method: initialize #------------------------------------------------------------------------- def initialize block_given? || raise(ArgumentError,'new Fiber requires a block') @yield = Queue.new @resume = Queue.new @thread = Thread.new { @yield.push [yield(*@resume.pop)] } @thread.abort_on_exception = true @thread[:fiber] = self end #------------------------------------------------------------------------- # * New method: self.current #------------------------------------------------------------------------- def self.current Thread.current[:fiber] || raise(FiberError,'not inside a fiber',caller(1)) end #------------------------------------------------------------------------- # * New method: self.yield #------------------------------------------------------------------------- def self.yield(*args) (fiber = Thread.current[:fiber]) || (raise "can't yield from root fiber") fiber.yield(*args) end #------------------------------------------------------------------------- # * New method: inspect #------------------------------------------------------------------------- def inspect "#<#{self.class}:0x#{self.object_id.to_s(16)}>" end #------------------------------------------------------------------------- # * New method: resume #------------------------------------------------------------------------- def resume(*args) alive? || raise(FiberError,'dead fiber called',caller(1)) @resume.push(args) result = @yield.pop result.size > 1 ? result : result.first end #------------------------------------------------------------------------- # * New method: alive? #------------------------------------------------------------------------- def alive? @thread.alive? end #------------------------------------------------------------------------- # * New method: yield #------------------------------------------------------------------------- def yield(*args) @yield.push(args) result = @resume.pop result.size > 1 ? result : result.first end end end #============================================================================== # ** Queue #------------------------------------------------------------------------------ # #============================================================================== unless defined?(Queue) class Queue #------------------------------------------------------------------------- # * New method: initialize #------------------------------------------------------------------------- def initialize (@que ||= []).taint (@waiting ||= []).taint # enable tainted comunication self.taint end #------------------------------------------------------------------------- # * New method: clear #------------------------------------------------------------------------- def clear @que.clear end #------------------------------------------------------------------------- # * New method: empty? #------------------------------------------------------------------------- def empty? @que.empty? end #------------------------------------------------------------------------- # * New method: length #------------------------------------------------------------------------- def length @que.length end #------------------------------------------------------------------------- # * New method: num_waiting #------------------------------------------------------------------------- def num_waiting @waiting.size end #------------------------------------------------------------------------- # * New method: pop #------------------------------------------------------------------------- def pop(non_block=false) while (Thread.critical = true; @que.empty?) non_block && raise(ThreadError,"queue empty",caller(1)) @waiting.push(Thread.current) Thread.stop end @que.shift ensure Thread.critical = false end #------------------------------------------------------------------------- # * New method: push #------------------------------------------------------------------------- def push(obj) Thread.critical = true @que.push obj begin t = @waiting.shift t && t.wakeup rescue ThreadError retry ensure Thread.critical = false end begin t && t.run rescue ThreadError end end #------------------------------------------------------------------------- # * Alias Listing #------------------------------------------------------------------------- method_defined?(:size) || (alias_method :size, :length) method_defined?(:shift) || (alias_method :shift, :pop) method_defined?(:deq) || (alias_method :deq, :pop) method_defined?(:"<<") || (alias_method :"<<", :push) method_defined?(:enq) || (alias_method :enq, :push) end end #============================================================================== # ** FiberError #------------------------------------------------------------------------------ # #============================================================================== defined?(FiberError) || (class FiberError < StandardError ; end)