Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require 'rubygems'
- require 'eventmachine'
- require 'em_netstring'
- require 'fuzzprotocol'
- require 'thread'
- require 'fuzzer'
- prod_mutex=Mutex.new
- prod_queue=Queue.new
- prod_thread=Thread.new do
- begin
- gen=Generators::RollingCorrupt.new("XXXX",8,8)
- while gen.next?
- prod_mutex.synchronize {
- prod_queue << gen.next
- }
- end
- puts "Production Thread Finished."
- Thread.current.exit
- rescue
- puts "Error: Production Thread Exiting: #{$!}"
- Thread.current.exit
- end
- end
- module FuzzServer
- def post_init
- @handler=NetStringTokenizer.new
- @production_queue=nil
- @production_mutex=nil
- end
- def receive_data(data)
- @handler.parse(data).each {|m|
- msg=FuzzMessage.new(m)
- if msg.verb=="CLIENT READY"
- # define a block to prepare the response
- get_data=proc do
- my_data=false
- loop do
- @production_mutex.synchronize {
- my_data=@production_queue.pop unless @production_queue.empty?
- }
- break if my_data
- sleep(rand(5))
- end
- # This is what will be passed to the callback
- @handler.pack(FuzzMessage.new({:verb=>"DELIVER",:data=>my_data}).to_yaml)
- end
- # This callback will be invoked once the response is ready.
- callback=proc do |data|
- send_data data
- end
- # Send the work to the thread queue, so we are ready for more connections.
- EM.defer(get_data, callback)
- end
- }
- end
- def set_queue(q_obj,mutx)
- @production_queue=q_obj
- @production_mutex=mutx
- end
- end
- EventMachine::run {
- EventMachine::start_server("0.0.0.0", 10000, FuzzServer) {|c|
- c.set_queue(prod_queue,prod_mutex)
- }
- }
Add Comment
Please, Sign In to add comment