Last active
August 29, 2015 14:21
-
-
Save mpelos/94e2167e12c1de3382ae to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Cloudy::CloudWorker | |
class CurrentStepNotFound < StandardError; end | |
class StepClassNotFound < StandardError; end | |
class CanNotAsyncExecuteFirstStep < StandardError; end | |
# @!attribute customer_repository | |
# @return [CustomerRepository] the customer repository | |
attr_accessor :customer_repository | |
# @!attribute pipeline_manager | |
# @return [Cloudy::PipelineManager] the pipeline manager | |
attr_accessor :pipeline_manager | |
# @!attribute pipeline | |
# @return [Pipeline] the pipeline | |
attr_accessor :pipeline | |
# @!attribute current_step_id | |
# @return [Integer] the current step id | |
attr_accessor :current_step_id | |
# @!attribute [r] customer_id | |
# @return [Integer] the customer id | |
attr_reader :customer_id | |
# @!attribute pipeline_consumer | |
# @return [#perform_async] the pipeline consumer class | |
attr_accessor :pipeline_consumer | |
# @param customer_id [Integer] | |
# @param customer_repository [CustomerRepository] | |
# @param pipeline_manager [Cloudy::PipelineManager] | |
# @param pipeline_consumer [#perform_async] the pipeline consumer | |
# @param pipeline [Pipeline] | |
# @param current_step_id [Integer] the current step id if any | |
def initialize(customer_id:, | |
customer_repository: CustomerRepository.new, | |
pipeline_manager: Cloudy::PipelineManager.new, | |
pipeline_consumer: Job::PipelineConsumer, | |
pipeline: nil, | |
current_step_id: nil) | |
@customer_repository = customer_repository | |
@customer_id = customer_id | |
@current_step_id = current_step_id | |
@pipeline_manager = pipeline_manager | |
@pipeline = pipeline | |
@pipeline_consumer = pipeline_consumer | |
@pipeline_manager.pipeline = pipeline | |
end | |
# @return [Customer] | |
def customer | |
@customer ||= customer_repository.find_by_id(customer_id) | |
end | |
# Starts the creation | |
# @return [Pipeline] the pipeline that will handle the creation | |
# @note this method must schedule the first step execution | |
def start | |
fail NotImplementedError | |
end | |
# Returns the pipeline type | |
# @return [String] | |
def pipeline_type | |
self.class.to_s.split("::").last.underscore | |
end | |
# Executes the current step | |
# @raise [CurrentStepNotFound] if the current step is not found | |
def execute_current_step | |
fail CurrentStepNotFound unless current_step | |
current_runner.run | |
current_step.async_result = current_runner.async_result | |
pipeline_manager.save_step current_step | |
end | |
# Returns the current runner | |
# | |
# @note the default implementation tries to find a method with the | |
# current_step#step_class name append the "_step" string. eg. | |
# | |
# ``` | |
# current_step.step_class = "Cloudy::AWS::Runner::KeyPairCreator" | |
# ``` | |
# | |
# The method in self that will be called is: | |
# | |
# ``` | |
# #cloudy_aws_runner_key_pair_creator_step | |
# ``` | |
# | |
# @return [Cloudy::Runner] the current runner | |
# | |
# @raise [StepClassNotFound] if the `current_step#step_class` is unknown | |
def current_runner | |
return unless current_step | |
return @current_runner if @current_runner | |
method_name = current_step.step_class.underscore.gsub("/", "_") + "_step" | |
fail StepClassNotFound, method_name unless respond_to? method_name, true | |
@current_runner = send(method_name) | |
end | |
# Returns whether or not the current step is done | |
# @return [Boolean] if the current step is done | |
def current_step_done? | |
current_runner.try :done? | |
end | |
# Returns whether or not the pipeline is done | |
# @return [Boolean] whether or not the pipeline is done | |
def pipeline_done? | |
pipeline_manager.pipeline_done? | |
end | |
# Marks the current step as done | |
def current_step_done! | |
current_step.done = true | |
current_step.output = current_runner.output | |
pipeline_manager.save_step current_step | |
pipeline_manager.pipeline_done! if pipeline_manager.pipeline_done? | |
end | |
# Returns the current step if any | |
# @return [PipelineStep, nil] the current step or nil | |
def current_step | |
@current_step ||= pipeline_manager.step_by_id(current_step_id) if current_step_id | |
end | |
# Marks the current step as a failure | |
def current_step_failed! | |
return unless current_step | |
current_step.failed = true | |
begin | |
current_step.output = current_runner.try(:output) | |
rescue | |
# Imagine current_runner implementation has a bug, so the only | |
# way to avoid exploding here again is to rescue any errors this | |
# may throw. | |
end | |
pipeline_manager.save_step current_step | |
end | |
protected | |
# Schedules the first pipeline step for execution | |
# @raise [CanNotAsyncExecuteFirstStep] if there are not first step | |
def async_execute_first_step | |
step = pipeline_manager.first_step | |
fail CanNotAsyncExecuteFirstStep unless step | |
async_execute_step step | |
end | |
# Schedules a step async execution | |
# @param step [PipelineStep] | |
def async_execute_step(step) | |
pipeline_consumer.perform_async step.id | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Cloudy::AWS::AutoScalingFormationUpdaterWorker < Cloudy::CloudWorker | |
# @!attribute [r] topology | |
# @return [Topology] | |
attr_reader :topology | |
# @!attribute [r] min_instances | |
# @return [Integer, nil] | |
attr_reader :min_instances | |
# @!attribute [r] max_instances | |
# @return [Integer, nil] | |
attr_reader :max_instances | |
# @!attribute [r] profile_repository | |
# @return [String] | |
attr_reader :profile_repository | |
# @!attribute [r] cloud_formation_stack_name | |
# @return [String] | |
attr_reader :cloud_formation_stack_name | |
# @param topology_id [Integer] | |
# @param min_instances [Integer, nil] | |
# @param max_instances [Integer, nil] | |
# @param topology_repository [TopologyRepository] | |
# @param profile_repository [ProfileRepository] | |
# @param auto_scaling_formation_stack_name_class [Class] | |
def initialize( | |
topology_id:, | |
min_instances: nil, | |
max_instances: nil, | |
topology_repository: TopologyRepository.new, | |
profile_repository: ProfileRepository.new, | |
auto_scaling_formation_stack_name_class: Cloudy::AWS::AutoScalingFormationStackName, | |
**kwargs | |
) | |
@topology = topology_repository.find_by_id(topology_id) | |
super(customer_id: topology.customer_id, **kwargs) | |
@provider = topology.provider | |
@min_instances = min_instances | |
@max_instances = max_instances | |
@profile_repository = profile_repository | |
@cloud_formation_stack_name = auto_scaling_formation_stack_name_class.new(topology: topology).to_s | |
end | |
def cloudy_aws_runner_auto_scaling_formation_updater_step | |
current_step.step_class.constantize.new( | |
stack_name: cloud_formation_stack_name, | |
product_name: topology.product_name, | |
min_instances: min_instances, | |
max_instances: max_instances, | |
access_key: profile.user, | |
secret_key: profile.password, | |
region: topology.region, | |
async_result: current_step.async_result | |
) | |
end | |
def current_runner | |
return unless current_step | |
return @current_runner if @current_runner | |
@current_runner = | |
case current_step.step_class | |
when Cloudy::AWS::Runner::AutoScalingFormationUpdater.name | |
then cloudy_aws_runner_auto_scaling_formation_updater_step | |
else | |
fail Cloudy::CloudWorker::StepClassNotFound | |
end | |
rescue => e | |
mark_the_current_step_as_failed | |
if e.is_a? Cloudy::CloudWorker::StepClassNotFound | |
raise e | |
else | |
raise Cloudy::CloudWorker::RunnerInitializationFailure, e.message, e.backtrace | |
end | |
end | |
def current_step_done? | |
if super | |
current_step.done = true | |
current_step.output = current_runner.output | |
pipeline_manager.save_step current_step | |
pipeline_manager.pipeline_done! if pipeline_manager.pipeline_done? | |
true | |
end | |
end | |
def profile | |
@profile ||= profile_repository.find_by_customer_id_and_provider_id topology.customer_id, topology.provider_id | |
end | |
def responsible_class_args | |
@responsible_class_args ||= { | |
topology_id: topology.id, | |
min_instances: min_instances, | |
max_instances: max_instances | |
} | |
end | |
def start | |
@pipeline = pipeline_manager.create_pipeline( | |
customer: customer, | |
provider: topology.provider, | |
pipeline_type: pipeline_type, | |
responsible_class: self.class.name, | |
responsible_class_args: responsible_class_args | |
) | |
pipeline_manager.create_step step_class: Cloudy::AWS::Runner::AutoScalingFormationUpdater.name, priority: 0 | |
async_execute_first_step | |
pipeline | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment