Skip to content

Instantly share code, notes, and snippets.

@jmettraux
Created August 19, 2011 16:38
Show Gist options
  • Save jmettraux/1157286 to your computer and use it in GitHub Desktop.
Save jmettraux/1157286 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
#$:.unshift('~/w/ruote/lib')
#$:.unshift('~/w/ruote-redis/lib')
require 'rubygems'
require 'redis'
require 'ruote'
require 'ruote-redis'
require 'yajl' rescue require 'json'
require 'rufus-json'
Rufus::Json.detect_backend
#require 'ResultParticipant.rb'
#require 'SshParticipant.rb'
# <-- avoid 'camel-case' in ruby filenames, the convention is:
#require 'result_participant.rb'
#require 'ssh_participant.rb'
class ResultParticipant
include Ruote::LocalParticipant
def consume(workitem)
workitem.fields['result'] = 'so so'
reply_to_engine(workitem)
end
end
class SshParticipant
include Ruote::LocalParticipant
def consume(workitem)
workitem.fields['ssh'] = 'got permission denied...'
reply_to_engine(workitem)
end
end
master = Ruote::Engine.new(
Ruote::Worker.new(
Ruote::Redis::Storage.new(
'host' => '127.0.0.1',
'db' => 12,
'thread_safe' => true,
'engine_id' => 'master')))
slave = Ruote::Engine.new(
Ruote::Worker.new(
Ruote::Redis::Storage.new(
'host' => '127.0.0.1',
'db' => 13,
'thread_safe' => true,
'engine_id' => 'slave1')))
#master.storage.purge!
#slave.storage.purge!
master.register_participant(
'slave1',
Ruote::EngineParticipant,
'storage_class' => Ruote::Redis::Storage,
'storage_args' => {
'host' => '127.0.0.1',
'db' => 13,
'thread_safe' => true })
slave.register_participant(
'master',
Ruote::EngineParticipant,
'storage_class' => Ruote::Redis::Storage,
'storage_args' => {
'host' => '127.0.0.1',
'db' => 12,
'thread_safe' => true })
master.noisy = true
#slave.noisy = true
master.register_participant 'ssh', SshParticipant
master.register_participant 'emit_results', ResultParticipant
slave.register_participant 'ssh', SshParticipant
slave.register_participant 'emit_results', ResultParticipant
pdef = Ruote.process_definition do
iterator :on_f => 'ips', :to_f => 'ip', :merge_type => 'concat' do
ssh :ip => '${ip}', :command => 'du -f'
end
subprocess :sub, :engine => 'slave1'
define :sub do
emit_results
end
end
wfid = master.launch(pdef, 'ips' => %w[ 192.168.1.1 ])
master.wait_for(wfid)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment