Created
          April 3, 2017 08:04 
        
      - 
      
- 
        Save oivoodoo/2a52322b7d89fcb7bb0363587f1f0c37 to your computer and use it in GitHub Desktop. 
    spec_helper.rb
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | require 'rspec' | |
| require 'ostruct' | |
| require 'excon' | |
| require 'docker' | |
| require 'securerandom' | |
| Excon.defaults[:write_timeout] = 20000 | |
| Excon.defaults[:read_timeout] = 20000 | |
| class String | |
| def squish! | |
| gsub!(/\A[[:space:]]+/, '') | |
| gsub!(/[[:space:]]+\z/, '') | |
| gsub!(/[[:space:]]+/, ' ') | |
| self | |
| end | |
| end | |
| module Helpers | |
| def normalize(query) | |
| # - sql could contains comments like -- COMMENT | |
| # we should delete them because of squish! could break sql layout. | |
| query = query.to_s.gsub(/\-\-.+$/,'').chomp(";").squish! | |
| "#{query};\r\n" | |
| end | |
| module_function :normalize | |
| ROOT_PATH = File.join(File.dirname(File.expand_path(__FILE__)), '..') | |
| SCHEMA_PATH = File.join(ROOT_PATH, 'schema') | |
| DATABASES_SQL = %Q{ | |
| CREATE DATABASE IF NOT EXISTS test; | |
| CREATE DATABASE IF NOT exists prod; -- your database required by app-schema | |
| } | |
| OUT_PATH = "/app/out" | |
| LOCAL_OUT_PATH = File.join(ROOT_PATH, 'out') | |
| SCHEME_SQL = DATABASES_SQL + [ | |
| "hive-schema-app.hql", | |
| "hive-schema-test.hql", # create testable tables to load from txt file | |
| ].map do |file_path| | |
| query_path = File.join(SCHEMA_PATH, file_path) | |
| query = Helpers.normalize(File.read(query_path)) | |
| query | |
| end.join | |
| def load_data(pre_table, table, txt_path, partitions = []) | |
| cs = partitions.join(',') | |
| if cs != "" | |
| cs = " PARTITION (#{cs})" | |
| end | |
| %Q{ | |
| LOAD DATA LOCAL INPATH '/app/spec/fixtures/#{txt_path}' OVERWRITE INTO TABLE #{pre_table}; | |
| INSERT OVERWRITE TABLE #{table} #{cs} SELECT * FROM #{pre_table}; | |
| }.squish! | |
| end | |
| def path_for(filename) | |
| File.join(ROOT_PATH, "queries", filename) | |
| end | |
| class HiveOptions | |
| attr_reader :options | |
| def initialize(options) | |
| @options = options || {} | |
| end | |
| def to_s | |
| options.map { |k, v| "--hiveconf #{k}=#{v}" }.join(' ') | |
| end | |
| end | |
| def _get_settings(options = {}) | |
| OpenStruct.new( | |
| debug_enabled: options.fetch(:debug) { false }, | |
| hive_options: HiveOptions.new(options.fetch(:hive) { {} }), | |
| query_options: options.fetch(:query_options) { [] }) | |
| end | |
| def start_container | |
| puts "======== Begin start container..." | |
| container = Docker::Container.create( | |
| 'Image' => ENV['DOCKER_IMAGE_NAME'], | |
| "HostConfig" => { | |
| 'Binds' => ["#{ROOT_PATH}:/app:rw"], | |
| }, | |
| "Env": [ | |
| "HIVE_OPTS=" + [ | |
| "--hiveconf mapreduce.map.memory.mb=2048", | |
| "--hiveconf mapreduce.reduce.memory.mb=2048", | |
| "--hiveconf mapreduce.reduce.java.opts=-Xmx2024m", | |
| "--hiveconf mapreduce.map.java.opts=-Xmx2024m"].join(' '), | |
| ], | |
| "Volumes": {"#{ROOT_PATH}": {}}, | |
| 'Tty' => true) | |
| response = container.start | |
| @container_id = response.id | |
| puts "======== Done start container." | |
| response.id | |
| end | |
| def stop_container | |
| puts "======== Begin stop container..." | |
| container = Docker::Container.get(container_id) | |
| container.kill | |
| puts "======== Done stop container." | |
| end | |
| def container_id | |
| @container_id | |
| end | |
| # run inside of container prepared query(shoudl contain hive custom udf | |
| # functions before to run the queries because of having in the prod | |
| # create-udf.hql | |
| def run(local_query_path, settings) | |
| # insert fuctions before to run hive queries | |
| query = normalize( | |
| settings.query_options.join + | |
| File.read(local_query_path)) | |
| name = new_name | |
| local_path = File.join(LOCAL_OUT_PATH, name) | |
| remote_path = File.join(OUT_PATH, name) | |
| File.write(local_path, query) | |
| container = Docker::Container.get(container_id) | |
| command = ["/bin/bash", "-c", "/hive/bin/hive #{settings.hive_options} < #{remote_path}"] | |
| response, _, _ = container.exec(command, tty: true) | |
| if ENV['DEBUG'] or settings.debug_enabled | |
| File.open("/tmp/debug.txt", "w+") do |file| | |
| file.write(response.join) | |
| end | |
| end | |
| return response | |
| end | |
| def new_name | |
| "#{SecureRandom.uuid}.tmp" | |
| end | |
| def fetch(query_path, options = {}) | |
| fetch_query File.read(query_path), options | |
| end | |
| def fetch_query(query, options = {}) | |
| settings = _get_settings(options) | |
| name = new_name | |
| folder_name = name.gsub('.tmp', '.parts') | |
| output_path = File.join(LOCAL_OUT_PATH, name) | |
| query = """ | |
| INSERT OVERWRITE LOCAL DIRECTORY '#{File.join(OUT_PATH, folder_name)}' | |
| ROW FORMAT DELIMITED | |
| FIELDS TERMINATED BY '\\t' | |
| LINES TERMINATED BY '\\n' | |
| #{query}; | |
| """.squish! | |
| query = normalize(query) | |
| local_path = File.join(LOCAL_OUT_PATH, name) | |
| File.write(local_path, query) | |
| run(local_path, settings) | |
| # INSERT OVERWRITE is producing 0000_0 ... files there | |
| `cat #{File.join(LOCAL_OUT_PATH, folder_name)}/* > #{output_path}` | |
| lines = File.readlines(output_path).map do |line| | |
| line.strip.split("\t") | |
| end | |
| unless settings.debug_enabled | |
| `rm #{File.join(LOCAL_OUT_PATH, '*.tmp')} || true` | |
| `rm -rf #{File.join(LOCAL_OUT_PATH, '*.parts/')} || true` | |
| end | |
| lines | |
| end | |
| def execute(local_path, options = {}) | |
| settings = _get_settings(options) | |
| run(local_path, settings) | |
| end | |
| def execute_query(query, options = {}) | |
| if query.is_a? Array | |
| query = query.map { |q| Helpers.normalize(q) }.join(';') | |
| end | |
| query = normalize(query) | |
| settings = _get_settings(options) | |
| name = new_name | |
| local_path = File.join(LOCAL_OUT_PATH, name) | |
| File.write(local_path, query) | |
| run(local_path, settings) | |
| end | |
| end | |
| RSpec.configure do |config| | |
| config.include Helpers | |
| config.mock_with :rspec do |mocks| | |
| mocks.verify_partial_doubles = true | |
| end | |
| config.pattern = '*_spec.rb' | |
| config.disable_monkey_patching! | |
| config.order = :random | |
| config.expose_dsl_globally = true | |
| config.before(:all) do | |
| start_container | |
| execute_query Helpers::SCHEME_SQL | |
| end | |
| config.after(:all) do | |
| stop_container | |
| end | |
| Kernel.srand config.seed | |
| end | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment