- # master.rb
- #$:.unshift('~/w/ruote/lib')
- #$:.unshift('~/w/ruote-redis/lib')
- require 'rubygems'
- require 'rufus-json/automatic'
- require 'ruote'
- require 'redis'
- require 'ruote-redis'
- require 'net/ssh'
- #require 'copy_participant.rb'
- class CopyParticipant
- include Ruote::LocalParticipant
- def consume(workitem)
- id = workitem.fields['params']['nb'].to_i
- address = workitem.fields['addresses'][id]
- opts = {
- 'host' => '127.0.0.1',
- 'db' => 11 + id,
- 'thread_safe' => true }
- context.engine.register_participant(
- "slave-#{id}",
- Ruote::EngineParticipant,
- 'storage_class' => Ruote::Redis::Storage,
- 'storage_args' => opts)
- system("scp slave.rb root@#{adress}:/root/test_ruote")
- Net::SSH.start('172.16.65.81','root') do |ssh|
- ssh.exec("ruby /root/test_ruote/slave.rb")
- end
- puts "Fin de CopyParticipant"
- reply_to_engine(workitem)
- end
- end
- master = Ruote::Engine.new(
- Ruote::Worker.new(
- Ruote::Redis::Storage.new(
- 'host' => '127.0.0.1',
- 'db' => 10,
- 'thread_safe' => true,
- 'engine_id' => 'master')))
- master.noisy = true
- #slave = Ruote::Engine.new(Ruote::Worker.new(Ruote::Redis::Storage.new('host' => '127.0.0.1', 'db' => 12, 'thread_safe' => true, 'engine_id' => 'slave-0')))
- #slave.register_participant :alpha do |workitem|
- # puts "Hello, I'm alpha"
- #end
- #slave.register_participant('master', Ruote::EngineParticipant, 'storage_class' => Ruote::Redis::Storage, 'storage_args' => {'host'=>'127.0.0.1','db'=>10,'thread_safe'=>true})
- master.register_participant :init_workitem do |workitem|
- workitem.fields['adresses'] = []
- workitem.fields['adresses'].push("griffon-79")
- workitem.fields['adresses'].push("griffon-81")
- end
- master.register_participant :copy_to_slave_loc, CopyParticipant
- #master.register_participant('slave-0', Ruote::EngineParticipant, 'storage_class' => Ruote::Redis::Storage, 'storage_args' => {'host'=>'127.0.0.1', 'db'=>12, 'thread_safe'=>true})
- pdef = Ruote.process_definition do
- participant :init_workitem
- concurrent_iterator :times => 1 do
- #Copy the slave file on the slave computer
- participant :copy_to_slave_loc, :nb => '0'#, :forget => true
- #launch the subprocess with the slave engine
- subprocess :sub, :engine => 'slave-0'
- end
- subprocess :sub, :engine => 'slave-0'
- define :sub do
- participant :alpha
- end
- end
- wfid = master.launch(pdef)
- master.wait_for(wfid)
- ################################################################################
- ################################################################################
- # slave.rb
- $:.unshift('~/w/ruote/lib')
- $:.unshift('~/w/ruote-redis/lib')
- require 'rubygems'
- require 'rufus-json/automatic'
- require 'ruote'
- require 'redis'
- require 'ruote-redis'
- #require 'yajl' rescue require 'json'
- slave = Ruote::Engine.new(
- Ruote::Worker.new(
- Ruote::Redis::Storage.new(
- 'host' => 'localhost',
- 'db' => 11,
- 'thread_safe' => true,
- 'engine_id' => 'slave-0')))
- slave.noisy = true
- slave.register_participant(
- 'master',
- Ruote::EngineParticipant,
- 'storage_class' => Ruote::Redis::Storage,
- 'storage_args' => {
- 'host' => 'localhost',
- 'db' => 10,
- 'thread_safe' => true
- })
- slave.register_participant :alpha do |workitem|
- puts "Hello, je suis alpha"
- end
- slave.join