Skip to content

Instantly share code, notes, and snippets.

@glennsarti
Last active July 20, 2022 05:51
Show Gist options
  • Save glennsarti/ddf9a3f7058af2ad4dc1b19e8b46a37d to your computer and use it in GitHub Desktop.
Save glennsarti/ddf9a3f7058af2ad4dc1b19e8b46a37d to your computer and use it in GitHub Desktop.
Run Tasks Mock Server
require 'webrick'
require 'net/http'
require 'json'
require 'securerandom'
POST_PATH_RE = /\/(?'result'pass|fail|random|invalid|none|actual|seccomp|costmgmt)(?:$|-(?'delay'[\d]+))/.freeze
LOGS_PATH_RE = /\/logs\/(?'id'[a-zA-Z\-0-9]+$)/.freeze
$pending_jobs = []
$exit_now = false
$log_dir = File.join(Dir.tmpdir, 'runtasks')
def now_timestamp
Time.now.utc.to_s
end
def respond_to(run_task)
server_and_write_log(run_task, "Processing request for #{run_task.parsed_body['task_result_id']} ...")
uri = URI(run_task.parsed_body['task_result_callback_url'])
is_ssl = uri.scheme == 'https'
server_and_write_log(run_task, "Connecting to #{uri.to_s}...")
Net::HTTP.start(uri.host, uri.port, :use_ssl => is_ssl) do |http|
request = Net::HTTP::Patch.new uri
resp_hash = {
data: {
type: 'task-results',
attributes: {
status: "#{run_task.status}",
message: "#{run_task.message}",
url: "https://#{ENV['TFE_FQDN']}/runtasks/logs/#{run_task.id}"
}
}
}
request.body = resp_hash.to_json
request['Content-Type'] = 'application/vnd.api+json'
request['Authorization'] = "Bearer #{run_task.parsed_body['access_token']}"
server_and_write_log(run_task, "Sending PATCH request to #{uri.to_s}: #{JSON.pretty_generate(resp_hash)}")
callback_response = http.request request
server_and_write_log(run_task, "Response from #{uri.to_s}: #{callback_response.inspect}")
if callback_response.code_type != Net::HTTPOK
raise "Callback response should be Ok but got #{callback_response.code_type}"
end
end
end
def perform_action(action)
server_and_write_log(action, "Processing action request for #{action.parsed_body['task_result_id']} ...")
case action.parsed_body['stage']
when 'pre_plan'
action.send_message('running', 'Starting pre_plan stage')
action.send_message('failed', "An error occured") unless perform_pre_plan_action(action)
when 'post_plan'
action.send_message('running', 'Starting post_plan stage')
action.send_message('failed', "An error occured") unless perform_post_plan_action(action)
else
action.send_message('failed', "Unknown stage #{action.parsed_body['stage']}")
end
end
def can_get_url(action, url, token)
server_and_write_log(action, "Attempting to fetch '#{url}' ...")
uri = URI(url)
is_ssl = uri.scheme == 'https'
response = nil
Net::HTTP.start(uri.host, uri.port, :use_ssl => is_ssl) do |http|
request = Net::HTTP::Get.new uri
request['Authorization'] = "Bearer #{token}"
server_and_write_log(action, "Sending GET request to #{uri.to_s}")
response = http.request request
rescue => ex
server_and_write_log(action, "Error: #{ex}")
return false
end
server_and_write_log(action, "Response: #{response.to_s}")
if response.code_type == Net::HTTPFound || response.code_type == Net::HTTPTemporaryRedirect
server_and_write_log(action, "Being redirected to #{response.header['location']} ...")
can_get_url(action, response.header['location'], token)
else
response.code_type == Net::HTTPOK
end
end
def perform_pre_plan_action(action)
# Get the configuration version
url = action.parsed_body['configuration_version_download_url']
if url.nil?
server_and_write_log(action, "The Configuration Version URL is missing")
return false
end
server_and_write_log(action, "Validating the Configuration Version URL ...")
result = can_get_url(action, url, action.parsed_body['access_token'])
action.send_message('passed', "Completed") if result
result
end
def perform_post_plan_action(action)
# Get the configuration version
url = action.parsed_body['plan_json_api_url']
if url.nil?
server_and_write_log(action, "The Plan JSON URL is missing")
return false
end
server_and_write_log(action, "Validating the Plan JSON URL ...")
result = can_get_url(action, url, action.parsed_body['access_token'])
action.send_message('passed', "Completed") if result
result
end
def parse_request_body(body)
return nil if body.nil? || body == ""
JSON.parse(body)
end
def initial_log(request_id, request)
File.open(File.join($log_dir, request_id), "w") do |f|
f.write("#{now_timestamp} Received request\n")
req = parse_request_body(request.body)
unless req.nil?
req_string = JSON.pretty_generate(req)
req_string.gsub!(req['access_token'], "redacted")
f.write("#{now_timestamp} Request: #{req_string}\n")
end
end
end
def write_log(delayed_response, message)
File.open(File.join($log_dir, delayed_response.id), "a") do |f|
f.write("#{now_timestamp} #{message}\n")
end
end
def server_and_write_log(delayed_response, message)
$server.logger.info("(#{delayed_response.id}) " + message)
write_log(delayed_response, message)
end
# Send a static response
class DelayedResponse
attr_accessor :id, :response_attempt, :request, :send_at, :status, :message, :url
def parsed_body
@parse_body ||= parse_request_body(request.body)
end
end
# Do something with the request
class DelayedAction < DelayedResponse
def send_message(status, message)
delayed_response = DelayedResponse.new.tap do |dr|
dr.id = self.id
dr.request = self.request
dr.send_at = Time::now
dr.status = status
dr.message = message
end
server_and_write_log(self, "Action message (#{status}): #{message}")
$pending_jobs.push(delayed_response)
end
end
class RunTask < WEBrick::HTTPServlet::AbstractServlet
def do_GET(request, response)
logs_path = request.path.match(LOGS_PATH_RE)
unless logs_path.nil?
$server.logger.info("Found logs route")
return log_get(request, response, logs_path.named_captures['id'])
end
$server.logger.info("No route found")
return default_get(request, response)
end
def log_get(request, response, id)
log_path = File.join($log_dir, id)
unless File.exist?(log_path)
response.status = 404
return
end
file_content = File.read(log_path)
file_content.gsub!(/&/, '&amp;')
file_content.gsub!(/</, '&lt;')
file_content.gsub!(/>/, '&gt;')
response.status = 200
response.body = "<html><body>Log file for #{id}<br /><hr /><pre>#{file_content}</pre></body></html>"
end
def default_get(request, response)
response.status = 200
response.body = <<-HTML
<html>
<body>
Welcome to the mock <a href="https://www.terraform.io/cloud-docs/integrations/run-tasks">Run Task</a> Service.<br />
<hr />
Available routes via POST to https://#{ENV['TFE_FQDN']}/runtasks/: <br />
<br />
<table border="1">
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/pass</td><td>Returns a passing Run Task</td></tr>
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/fail</td><td>Returns a failing Run Task</td></tr>
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/random</td><td>Returns a passing or failing Run Task at random</td></tr>
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/invalid</td><td>Returns an invalid Run Task response</td></tr>
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/none</td><td>Does not return a response</td></tr>
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/actual</td><td>Attempts to download assets from the request e.g. Plans and Configuration Versions</td></tr>
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/seccomp</td><td>Looks like a Security Compliance failure</td></tr>
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/costmgmr</td><td>Looks like a Cost Management success</td></tr>
</table><br />
By default the Run Task will send a random response from 2 to 10 seconds. You can specify the delay (in seconds) by adding '-[number]' to the URL. e.g. https://#{ENV['TFE_FQDN']}/runtasks/pass-20 would delay by 20 seconds<br />
<br />
The Run Task service can return a progress status message halfway. Add 'with-progress=true' to the URL e.g. https://#{ENV['TFE_FQDN']}/runtasks/pass-10?with-progress=true will send a progress response at 5 seconds and a pass response at 10 seconds<br />
<hr />
Available routes via GET to https://#{ENV['TFE_FQDN']}/runtasks/: <br />
<br />
<table border="1">
<tr><td>https://#{ENV['TFE_FQDN']}/logs/#id</td><td>Returns the log for task execution 'id'</td></tr>
</table><br />
<br />
</body>
</html>
HTML
end
def do_POST(request, response)
$server.logger.info("Received #{request.path}: #{request.body}")
route = nil
uri_path = request.path.match(POST_PATH_RE)
with_progress = request.query_string && request.query_string.include?('with-progress=')
unless uri_path.nil?
route = uri_path.named_captures['result']
delay = uri_path.named_captures['delay'].to_i
delay = rand(2...10) if delay.zero?
end
$server.logger.info("Found route: #{route}")
request_id = SecureRandom.uuid
case route
when 'random'
initial_log(request_id, request)
send_running(request_id, request, delay) if with_progress
if rand > 0.5
send_pass(request_id, request, delay)
else
send_fail(request_id, request, delay)
end
when 'pass'
initial_log(request_id, request)
send_running(request_id, request, delay) if with_progress
send_pass(request_id, request, delay)
when 'fail'
initial_log(request_id, request)
send_running(request_id, request, delay) if with_progress
send_fail(request_id, request, delay)
when 'invalid'
initial_log(request_id, request)
send_running(request_id, request, delay) if with_progress
send_fail(request_id, request, delay)
when 'none'
initial_log(request_id, request)
send_running(request_id, request, delay) if with_progress
when 'actual'
initial_log(request_id, request)
send_delayed_action(request_id, request, delay)
when 'seccomp'
initial_log(request_id, request)
send_fail(request_id, request, delay, "Found: 1 medium severity issues(s), 3 low severity issues(s). Severity threshold is set to low.")
when 'costmgmt'
initial_log(request_id, request)
send_pass(request_id, request, delay, "Monthly cost will increase by $1,303. Previous monthly cost: $0.00. New monthly cost: $1,303.")
else
response.status = 404
end
end
def send_pass(id, request, delay, message = nil)
send_delayed_response(id, request, 'passed', delay, message)
end
def send_fail(id, request, delay, message = nil)
send_delayed_response(id, request, 'failed', delay, message)
end
def send_running(id, request, delay, message = nil)
send_delayed_response(id, request, 'running', delay / 2, message)
end
def send_invalid(id, request, delay, message = nil)
send_delayed_response(id, request, 'invalid', delay, message)
end
def send_delayed_action(id, request, delay)
$server.logger.info("Delaying action by #{delay} seconds")
delayed_action = DelayedAction.new.tap do |dr|
dr.id = id
dr.request = request
dr.send_at = Time::now + delay
end
write_log(delayed_action, "Sending action after #{delayed_action.send_at}")
$pending_jobs.push(delayed_action)
end
def send_delayed_response(id, request, status, delay, message = nil)
$server.logger.info("Delaying '#{status}' response by #{delay} seconds")
message = "Mock #{status} response" if message.nil?
delayed_response = DelayedResponse.new.tap do |dr|
dr.id = id
dr.request = request
dr.send_at = Time::now + delay
dr.status = status
dr.message = message
end
write_log(delayed_response, "Sending response after #{delayed_response.send_at}")
$pending_jobs.push(delayed_response)
end
end
$server = WEBrick::HTTPServer.new :Port => 8080, :Host => '0.0.0.0'
$server.mount '/', RunTask
Thread.new do
$server.logger.info("Starting job thread...")
while !$exit_now
sleep 1
idx = -1
while !idx.nil?
idx = $pending_jobs.find_index { |dr| Time::now > dr.send_at }
if !idx.nil?
job = $pending_jobs[idx]
$pending_jobs.delete_at(idx)
job.response_attempt = 0 if job.response_attempt.nil?
begin
case job
when DelayedAction
job.response_attempt += 1
perform_action(job)
else
job.response_attempt += 1
respond_to(job)
end
rescue => ex
server_and_write_log(job, "Error while running job: #{ex}")
if job.response_attempt > 3
server_and_write_log(job, "Too many attempts.")
else
server_and_write_log(job, "Requeuing in 1 seconds time...")
job.send_at = Time::now + 1
$pending_jobs.push(job)
end
end
end
end
end
end
trap('INT') { # stop server with Ctrl-C
$server.stop
$exit_now = true
}
unless Dir.exist?($log_dir)
$server.logger.info("Creating log directory #{$log_dir}")
Dir.mkdir($log_dir)
end
$server.logger.info("Starting Run Task Service...")
$server.start
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment