Skip to content

Instantly share code, notes, and snippets.

@oivoodoo
Created April 3, 2017 08:04
Show Gist options
  • Save oivoodoo/2a52322b7d89fcb7bb0363587f1f0c37 to your computer and use it in GitHub Desktop.
Save oivoodoo/2a52322b7d89fcb7bb0363587f1f0c37 to your computer and use it in GitHub Desktop.
spec_helper.rb
require 'rspec'
require 'ostruct'
require 'excon'
require 'docker'
require 'securerandom'
Excon.defaults[:write_timeout] = 20000
Excon.defaults[:read_timeout] = 20000
class String
def squish!
gsub!(/\A[[:space:]]+/, '')
gsub!(/[[:space:]]+\z/, '')
gsub!(/[[:space:]]+/, ' ')
self
end
end
module Helpers
def normalize(query)
# - sql could contains comments like -- COMMENT
# we should delete them because of squish! could break sql layout.
query = query.to_s.gsub(/\-\-.+$/,'').chomp(";").squish!
"#{query};\r\n"
end
module_function :normalize
ROOT_PATH = File.join(File.dirname(File.expand_path(__FILE__)), '..')
SCHEMA_PATH = File.join(ROOT_PATH, 'schema')
DATABASES_SQL = %Q{
CREATE DATABASE IF NOT EXISTS test;
CREATE DATABASE IF NOT exists prod; -- your database required by app-schema
}
OUT_PATH = "/app/out"
LOCAL_OUT_PATH = File.join(ROOT_PATH, 'out')
SCHEME_SQL = DATABASES_SQL + [
"hive-schema-app.hql",
"hive-schema-test.hql", # create testable tables to load from txt file
].map do |file_path|
query_path = File.join(SCHEMA_PATH, file_path)
query = Helpers.normalize(File.read(query_path))
query
end.join
def load_data(pre_table, table, txt_path, partitions = [])
cs = partitions.join(',')
if cs != ""
cs = " PARTITION (#{cs})"
end
%Q{
LOAD DATA LOCAL INPATH '/app/spec/fixtures/#{txt_path}' OVERWRITE INTO TABLE #{pre_table};
INSERT OVERWRITE TABLE #{table} #{cs} SELECT * FROM #{pre_table};
}.squish!
end
def path_for(filename)
File.join(ROOT_PATH, "queries", filename)
end
class HiveOptions
attr_reader :options
def initialize(options)
@options = options || {}
end
def to_s
options.map { |k, v| "--hiveconf #{k}=#{v}" }.join(' ')
end
end
def _get_settings(options = {})
OpenStruct.new(
debug_enabled: options.fetch(:debug) { false },
hive_options: HiveOptions.new(options.fetch(:hive) { {} }),
query_options: options.fetch(:query_options) { [] })
end
def start_container
puts "======== Begin start container..."
container = Docker::Container.create(
'Image' => ENV['DOCKER_IMAGE_NAME'],
"HostConfig" => {
'Binds' => ["#{ROOT_PATH}:/app:rw"],
},
"Env": [
"HIVE_OPTS=" + [
"--hiveconf mapreduce.map.memory.mb=2048",
"--hiveconf mapreduce.reduce.memory.mb=2048",
"--hiveconf mapreduce.reduce.java.opts=-Xmx2024m",
"--hiveconf mapreduce.map.java.opts=-Xmx2024m"].join(' '),
],
"Volumes": {"#{ROOT_PATH}": {}},
'Tty' => true)
response = container.start
@container_id = response.id
puts "======== Done start container."
response.id
end
def stop_container
puts "======== Begin stop container..."
container = Docker::Container.get(container_id)
container.kill
puts "======== Done stop container."
end
def container_id
@container_id
end
# run inside of container prepared query(shoudl contain hive custom udf
# functions before to run the queries because of having in the prod
# create-udf.hql
def run(local_query_path, settings)
# insert fuctions before to run hive queries
query = normalize(
settings.query_options.join +
File.read(local_query_path))
name = new_name
local_path = File.join(LOCAL_OUT_PATH, name)
remote_path = File.join(OUT_PATH, name)
File.write(local_path, query)
container = Docker::Container.get(container_id)
command = ["/bin/bash", "-c", "/hive/bin/hive #{settings.hive_options} < #{remote_path}"]
response, _, _ = container.exec(command, tty: true)
if ENV['DEBUG'] or settings.debug_enabled
File.open("/tmp/debug.txt", "w+") do |file|
file.write(response.join)
end
end
return response
end
def new_name
"#{SecureRandom.uuid}.tmp"
end
def fetch(query_path, options = {})
fetch_query File.read(query_path), options
end
def fetch_query(query, options = {})
settings = _get_settings(options)
name = new_name
folder_name = name.gsub('.tmp', '.parts')
output_path = File.join(LOCAL_OUT_PATH, name)
query = """
INSERT OVERWRITE LOCAL DIRECTORY '#{File.join(OUT_PATH, folder_name)}'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\\t'
LINES TERMINATED BY '\\n'
#{query};
""".squish!
query = normalize(query)
local_path = File.join(LOCAL_OUT_PATH, name)
File.write(local_path, query)
run(local_path, settings)
# INSERT OVERWRITE is producing 0000_0 ... files there
`cat #{File.join(LOCAL_OUT_PATH, folder_name)}/* > #{output_path}`
lines = File.readlines(output_path).map do |line|
line.strip.split("\t")
end
unless settings.debug_enabled
`rm #{File.join(LOCAL_OUT_PATH, '*.tmp')} || true`
`rm -rf #{File.join(LOCAL_OUT_PATH, '*.parts/')} || true`
end
lines
end
def execute(local_path, options = {})
settings = _get_settings(options)
run(local_path, settings)
end
def execute_query(query, options = {})
if query.is_a? Array
query = query.map { |q| Helpers.normalize(q) }.join(';')
end
query = normalize(query)
settings = _get_settings(options)
name = new_name
local_path = File.join(LOCAL_OUT_PATH, name)
File.write(local_path, query)
run(local_path, settings)
end
end
RSpec.configure do |config|
config.include Helpers
config.mock_with :rspec do |mocks|
mocks.verify_partial_doubles = true
end
config.pattern = '*_spec.rb'
config.disable_monkey_patching!
config.order = :random
config.expose_dsl_globally = true
config.before(:all) do
start_container
execute_query Helpers::SCHEME_SQL
end
config.after(:all) do
stop_container
end
Kernel.srand config.seed
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment