Skip to content

Instantly share code, notes, and snippets.

@cronin101
Last active December 26, 2015 07:39
Show Gist options
  • Select an option

  • Save cronin101/7116238 to your computer and use it in GitHub Desktop.

Select an option

Save cronin101/7116238 to your computer and use it in GitHub Desktop.
no more $ hadoop dfs -

Starting up:

[bw1425n01]s0925570: ./hshell.rb

Clearing previous output:

s0925570@hadoop $ rmr ~/data/output
Deleted hdfs://bw1425n01.inf.ed.ac.uk/user/s0925570/data/output

Running a streaming job:

s0925570@hadoop $ :stream
        Input: ~/data/input
        Output: ~/data/output
        Mapper: ./mapper.py
        Reducer: ./reducer.py
To run this streaming job again, use the shortcut:
:stream --- \n:mapper: "./mapper.py "\n:reducer: "./reducer.py "\n:input: ~/data/input\n:output: ~/data/output\n

... job runs ...

s0925570@hadoop $ lsr ~/data/output
-rw-r--r--   3 s0925570 supergroup         52 2013-10-23 15:00 /user/s0925570/data/output/part-00000

... etc ...

Running the job (shorthand):

s0925570@hadoop $ :stream --- \n:mapper: "./mapper.py "\n:reducer: "./reducer.py "\n:input: ~/data/input\n:output: ~/data/output\n

Inspecting part of the result:

s0925570@hadoop $ tail ~/data/output/part-00000
But     1
ask     1
both    1
desert  2
jack    161
makes   161
up      6
#!/usr/bin/env ruby
require 'readline'
require 'pty'
require 'yaml'
class HShell
def initialize
@me = `whoami`.chomp
@hpath = '/opt/hadoop/hadoop-0.20.2/'
@hadoop = "#{@hpath}bin/hadoop"
@streaming = 'contrib/streaming/hadoop-0.20.2-streaming.jar'
end
def loop
while (command = Readline.readline("#{@me}@hadoop $ ", true)) do
if (match = /:(\w+)/.match(command)) && commands.keys.include?(c = match[1].to_sym)
defer c, command
else
puts `#{@hadoop} dfs -#{expand_path command}`
end
end
end
private
def expand_path(command)
command.gsub("~", "/user/" << @me)
end
def commands
{
:stream => {
:description => 'foo',
:function => lambda { |i| stream i}
}
}
end
def defer(command, input)
commands[command][:function].call(input.split(' ')[1..-1].join(' '))
end
def stream(commands)
if commands == ''
input = Readline.readline("\tInput: ")
output = Readline.readline("\tOutput: ")
mapper = Readline.readline("\tMapper: ")
reducer = Readline.readline("\tReducer: ")
puts "To run this streaming job again, use the shortcut:"
saved = ':stream ' << (commands = {:input => input, :output => output, :mapper => mapper, :reducer => reducer}.to_yaml).gsub("\n", '\n')
puts saved
Readline::HISTORY << saved
end
parsed = YAML::load(commands.gsub('\n', "\n"))
command =<<COMMAND
#{@hadoop} jar #{@hpath}#{@streaming} \
-input #{expand_path parsed[:input]} \
-output #{expand_path parsed[:output]} \
-mapper #{parsed[:mapper]} -file #{parsed[:mapper]} \
-reducer #{parsed[:reducer]} -file #{parsed[:reducer]}
COMMAND
puts command
begin
PTY.spawn(command) { |stdin, _, _| stdin.each { |line| puts line } }
rescue
end
end
end
HShell.new.loop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment