Last active
July 20, 2022 05:51
-
-
Save glennsarti/ddf9a3f7058af2ad4dc1b19e8b46a37d to your computer and use it in GitHub Desktop.
Run Tasks Mock Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'webrick' | |
require 'net/http' | |
require 'json' | |
require 'securerandom' | |
POST_PATH_RE = /\/(?'result'pass|fail|random|invalid|none|actual|seccomp|costmgmt)(?:$|-(?'delay'[\d]+))/.freeze | |
LOGS_PATH_RE = /\/logs\/(?'id'[a-zA-Z\-0-9]+$)/.freeze | |
$pending_jobs = [] | |
$exit_now = false | |
$log_dir = File.join(Dir.tmpdir, 'runtasks') | |
def now_timestamp | |
Time.now.utc.to_s | |
end | |
def respond_to(run_task) | |
server_and_write_log(run_task, "Processing request for #{run_task.parsed_body['task_result_id']} ...") | |
uri = URI(run_task.parsed_body['task_result_callback_url']) | |
is_ssl = uri.scheme == 'https' | |
server_and_write_log(run_task, "Connecting to #{uri.to_s}...") | |
Net::HTTP.start(uri.host, uri.port, :use_ssl => is_ssl) do |http| | |
request = Net::HTTP::Patch.new uri | |
resp_hash = { | |
data: { | |
type: 'task-results', | |
attributes: { | |
status: "#{run_task.status}", | |
message: "#{run_task.message}", | |
url: "https://#{ENV['TFE_FQDN']}/runtasks/logs/#{run_task.id}" | |
} | |
} | |
} | |
request.body = resp_hash.to_json | |
request['Content-Type'] = 'application/vnd.api+json' | |
request['Authorization'] = "Bearer #{run_task.parsed_body['access_token']}" | |
server_and_write_log(run_task, "Sending PATCH request to #{uri.to_s}: #{JSON.pretty_generate(resp_hash)}") | |
callback_response = http.request request | |
server_and_write_log(run_task, "Response from #{uri.to_s}: #{callback_response.inspect}") | |
if callback_response.code_type != Net::HTTPOK | |
raise "Callback response should be Ok but got #{callback_response.code_type}" | |
end | |
end | |
end | |
def perform_action(action) | |
server_and_write_log(action, "Processing action request for #{action.parsed_body['task_result_id']} ...") | |
case action.parsed_body['stage'] | |
when 'pre_plan' | |
action.send_message('running', 'Starting pre_plan stage') | |
action.send_message('failed', "An error occured") unless perform_pre_plan_action(action) | |
when 'post_plan' | |
action.send_message('running', 'Starting post_plan stage') | |
action.send_message('failed', "An error occured") unless perform_post_plan_action(action) | |
else | |
action.send_message('failed', "Unknown stage #{action.parsed_body['stage']}") | |
end | |
end | |
def can_get_url(action, url, token) | |
server_and_write_log(action, "Attempting to fetch '#{url}' ...") | |
uri = URI(url) | |
is_ssl = uri.scheme == 'https' | |
response = nil | |
Net::HTTP.start(uri.host, uri.port, :use_ssl => is_ssl) do |http| | |
request = Net::HTTP::Get.new uri | |
request['Authorization'] = "Bearer #{token}" | |
server_and_write_log(action, "Sending GET request to #{uri.to_s}") | |
response = http.request request | |
rescue => ex | |
server_and_write_log(action, "Error: #{ex}") | |
return false | |
end | |
server_and_write_log(action, "Response: #{response.to_s}") | |
if response.code_type == Net::HTTPFound || response.code_type == Net::HTTPTemporaryRedirect | |
server_and_write_log(action, "Being redirected to #{response.header['location']} ...") | |
can_get_url(action, response.header['location'], token) | |
else | |
response.code_type == Net::HTTPOK | |
end | |
end | |
def perform_pre_plan_action(action) | |
# Get the configuration version | |
url = action.parsed_body['configuration_version_download_url'] | |
if url.nil? | |
server_and_write_log(action, "The Configuration Version URL is missing") | |
return false | |
end | |
server_and_write_log(action, "Validating the Configuration Version URL ...") | |
result = can_get_url(action, url, action.parsed_body['access_token']) | |
action.send_message('passed', "Completed") if result | |
result | |
end | |
def perform_post_plan_action(action) | |
# Get the configuration version | |
url = action.parsed_body['plan_json_api_url'] | |
if url.nil? | |
server_and_write_log(action, "The Plan JSON URL is missing") | |
return false | |
end | |
server_and_write_log(action, "Validating the Plan JSON URL ...") | |
result = can_get_url(action, url, action.parsed_body['access_token']) | |
action.send_message('passed', "Completed") if result | |
result | |
end | |
def parse_request_body(body) | |
return nil if body.nil? || body == "" | |
JSON.parse(body) | |
end | |
def initial_log(request_id, request) | |
File.open(File.join($log_dir, request_id), "w") do |f| | |
f.write("#{now_timestamp} Received request\n") | |
req = parse_request_body(request.body) | |
unless req.nil? | |
req_string = JSON.pretty_generate(req) | |
req_string.gsub!(req['access_token'], "redacted") | |
f.write("#{now_timestamp} Request: #{req_string}\n") | |
end | |
end | |
end | |
def write_log(delayed_response, message) | |
File.open(File.join($log_dir, delayed_response.id), "a") do |f| | |
f.write("#{now_timestamp} #{message}\n") | |
end | |
end | |
def server_and_write_log(delayed_response, message) | |
$server.logger.info("(#{delayed_response.id}) " + message) | |
write_log(delayed_response, message) | |
end | |
# Send a static response | |
class DelayedResponse | |
attr_accessor :id, :response_attempt, :request, :send_at, :status, :message, :url | |
def parsed_body | |
@parse_body ||= parse_request_body(request.body) | |
end | |
end | |
# Do something with the request | |
class DelayedAction < DelayedResponse | |
def send_message(status, message) | |
delayed_response = DelayedResponse.new.tap do |dr| | |
dr.id = self.id | |
dr.request = self.request | |
dr.send_at = Time::now | |
dr.status = status | |
dr.message = message | |
end | |
server_and_write_log(self, "Action message (#{status}): #{message}") | |
$pending_jobs.push(delayed_response) | |
end | |
end | |
class RunTask < WEBrick::HTTPServlet::AbstractServlet | |
def do_GET(request, response) | |
logs_path = request.path.match(LOGS_PATH_RE) | |
unless logs_path.nil? | |
$server.logger.info("Found logs route") | |
return log_get(request, response, logs_path.named_captures['id']) | |
end | |
$server.logger.info("No route found") | |
return default_get(request, response) | |
end | |
def log_get(request, response, id) | |
log_path = File.join($log_dir, id) | |
unless File.exist?(log_path) | |
response.status = 404 | |
return | |
end | |
file_content = File.read(log_path) | |
file_content.gsub!(/&/, '&') | |
file_content.gsub!(/</, '<') | |
file_content.gsub!(/>/, '>') | |
response.status = 200 | |
response.body = "<html><body>Log file for #{id}<br /><hr /><pre>#{file_content}</pre></body></html>" | |
end | |
def default_get(request, response) | |
response.status = 200 | |
response.body = <<-HTML | |
<html> | |
<body> | |
Welcome to the mock <a href="https://www.terraform.io/cloud-docs/integrations/run-tasks">Run Task</a> Service.<br /> | |
<hr /> | |
Available routes via POST to https://#{ENV['TFE_FQDN']}/runtasks/: <br /> | |
<br /> | |
<table border="1"> | |
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/pass</td><td>Returns a passing Run Task</td></tr> | |
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/fail</td><td>Returns a failing Run Task</td></tr> | |
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/random</td><td>Returns a passing or failing Run Task at random</td></tr> | |
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/invalid</td><td>Returns an invalid Run Task response</td></tr> | |
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/none</td><td>Does not return a response</td></tr> | |
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/actual</td><td>Attempts to download assets from the request e.g. Plans and Configuration Versions</td></tr> | |
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/seccomp</td><td>Looks like a Security Compliance failure</td></tr> | |
<tr><td>https://#{ENV['TFE_FQDN']}/runtasks/costmgmr</td><td>Looks like a Cost Management success</td></tr> | |
</table><br /> | |
By default the Run Task will send a random response from 2 to 10 seconds. You can specify the delay (in seconds) by adding '-[number]' to the URL. e.g. https://#{ENV['TFE_FQDN']}/runtasks/pass-20 would delay by 20 seconds<br /> | |
<br /> | |
The Run Task service can return a progress status message halfway. Add 'with-progress=true' to the URL e.g. https://#{ENV['TFE_FQDN']}/runtasks/pass-10?with-progress=true will send a progress response at 5 seconds and a pass response at 10 seconds<br /> | |
<hr /> | |
Available routes via GET to https://#{ENV['TFE_FQDN']}/runtasks/: <br /> | |
<br /> | |
<table border="1"> | |
<tr><td>https://#{ENV['TFE_FQDN']}/logs/#id</td><td>Returns the log for task execution 'id'</td></tr> | |
</table><br /> | |
<br /> | |
</body> | |
</html> | |
HTML | |
end | |
def do_POST(request, response) | |
$server.logger.info("Received #{request.path}: #{request.body}") | |
route = nil | |
uri_path = request.path.match(POST_PATH_RE) | |
with_progress = request.query_string && request.query_string.include?('with-progress=') | |
unless uri_path.nil? | |
route = uri_path.named_captures['result'] | |
delay = uri_path.named_captures['delay'].to_i | |
delay = rand(2...10) if delay.zero? | |
end | |
$server.logger.info("Found route: #{route}") | |
request_id = SecureRandom.uuid | |
case route | |
when 'random' | |
initial_log(request_id, request) | |
send_running(request_id, request, delay) if with_progress | |
if rand > 0.5 | |
send_pass(request_id, request, delay) | |
else | |
send_fail(request_id, request, delay) | |
end | |
when 'pass' | |
initial_log(request_id, request) | |
send_running(request_id, request, delay) if with_progress | |
send_pass(request_id, request, delay) | |
when 'fail' | |
initial_log(request_id, request) | |
send_running(request_id, request, delay) if with_progress | |
send_fail(request_id, request, delay) | |
when 'invalid' | |
initial_log(request_id, request) | |
send_running(request_id, request, delay) if with_progress | |
send_fail(request_id, request, delay) | |
when 'none' | |
initial_log(request_id, request) | |
send_running(request_id, request, delay) if with_progress | |
when 'actual' | |
initial_log(request_id, request) | |
send_delayed_action(request_id, request, delay) | |
when 'seccomp' | |
initial_log(request_id, request) | |
send_fail(request_id, request, delay, "Found: 1 medium severity issues(s), 3 low severity issues(s). Severity threshold is set to low.") | |
when 'costmgmt' | |
initial_log(request_id, request) | |
send_pass(request_id, request, delay, "Monthly cost will increase by $1,303. Previous monthly cost: $0.00. New monthly cost: $1,303.") | |
else | |
response.status = 404 | |
end | |
end | |
def send_pass(id, request, delay, message = nil) | |
send_delayed_response(id, request, 'passed', delay, message) | |
end | |
def send_fail(id, request, delay, message = nil) | |
send_delayed_response(id, request, 'failed', delay, message) | |
end | |
def send_running(id, request, delay, message = nil) | |
send_delayed_response(id, request, 'running', delay / 2, message) | |
end | |
def send_invalid(id, request, delay, message = nil) | |
send_delayed_response(id, request, 'invalid', delay, message) | |
end | |
def send_delayed_action(id, request, delay) | |
$server.logger.info("Delaying action by #{delay} seconds") | |
delayed_action = DelayedAction.new.tap do |dr| | |
dr.id = id | |
dr.request = request | |
dr.send_at = Time::now + delay | |
end | |
write_log(delayed_action, "Sending action after #{delayed_action.send_at}") | |
$pending_jobs.push(delayed_action) | |
end | |
def send_delayed_response(id, request, status, delay, message = nil) | |
$server.logger.info("Delaying '#{status}' response by #{delay} seconds") | |
message = "Mock #{status} response" if message.nil? | |
delayed_response = DelayedResponse.new.tap do |dr| | |
dr.id = id | |
dr.request = request | |
dr.send_at = Time::now + delay | |
dr.status = status | |
dr.message = message | |
end | |
write_log(delayed_response, "Sending response after #{delayed_response.send_at}") | |
$pending_jobs.push(delayed_response) | |
end | |
end | |
$server = WEBrick::HTTPServer.new :Port => 8080, :Host => '0.0.0.0' | |
$server.mount '/', RunTask | |
Thread.new do | |
$server.logger.info("Starting job thread...") | |
while !$exit_now | |
sleep 1 | |
idx = -1 | |
while !idx.nil? | |
idx = $pending_jobs.find_index { |dr| Time::now > dr.send_at } | |
if !idx.nil? | |
job = $pending_jobs[idx] | |
$pending_jobs.delete_at(idx) | |
job.response_attempt = 0 if job.response_attempt.nil? | |
begin | |
case job | |
when DelayedAction | |
job.response_attempt += 1 | |
perform_action(job) | |
else | |
job.response_attempt += 1 | |
respond_to(job) | |
end | |
rescue => ex | |
server_and_write_log(job, "Error while running job: #{ex}") | |
if job.response_attempt > 3 | |
server_and_write_log(job, "Too many attempts.") | |
else | |
server_and_write_log(job, "Requeuing in 1 seconds time...") | |
job.send_at = Time::now + 1 | |
$pending_jobs.push(job) | |
end | |
end | |
end | |
end | |
end | |
end | |
trap('INT') { # stop server with Ctrl-C | |
$server.stop | |
$exit_now = true | |
} | |
unless Dir.exist?($log_dir) | |
$server.logger.info("Creating log directory #{$log_dir}") | |
Dir.mkdir($log_dir) | |
end | |
$server.logger.info("Starting Run Task Service...") | |
$server.start |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment