diff --git a/examples/awesome.asl b/examples/awesome.asl new file mode 100644 index 00000000..24e497a3 --- /dev/null +++ b/examples/awesome.asl @@ -0,0 +1,27 @@ +{ + "Comment": "Directory Listing", + "StartAt": "a", + "States": { + "a":{ + "Type": "Pass", + "Next": "b" + }, + "b": { + "Type": "Wait", + "Seconds": 1, + "Next": "ls" + }, + "ls": { + "Type": "Task", + "Resource": "awesome://ls -l Gemfile", + "Comment": "awesome://ls -l $FILENAME", + "Next": "c", + "Parameters": { + "FILENAME" : "Gemfile" + } + }, + "c": { + "Type": "Succeed" + } + } +} diff --git a/lib/floe/awesome_runner.rb b/lib/floe/awesome_runner.rb new file mode 100644 index 00000000..8520d9d5 --- /dev/null +++ b/lib/floe/awesome_runner.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +require "concurrent/array" + +module Floe + class AwesomeProcess < Thread + attr_reader :result + attr_accessor :error + + def initialize(queue, context, *args) + self.report_on_exception = true + @processed = false + @context = context + + # Don't like changing the value of context here, + # but want to make sure thread is set before the `queue.push` + # `queue.pop` will look potentially at status, which is through thread + context["thread"] = self + + super do + @result = AwesomeSpawn.run(*args) + + # this is changing the value of the context + # in the non-main thread + # Potential race condition here + Floe::AwesomeRunner.populate_results!(@context, :result => @result) + + # trigger an event + queue.push(["delete", context]) + rescue => err + # Shouldn't ever get in here + @error = err + + Floe::AwesomeRunner.populate_results!(@context, :error => err) + + # trigger an event + queue.push(["delete", context]) + end + end + end + + class AwesomeRunner < Floe::Runner + SCHEME = "awesome" + SCHEME_PREFIX = "#{SCHEME}://" + SCHEME_OFFSET = SCHEME.length + 3 + + # only exposed for tests + # use wait instead + attr_reader :queue + + def initialize(_options = {}) + require "awesome_spawn" + + # events triggered + @queue = Queue.new + + super + end + + # @return [Hash] runner_context + def run_async!(resource, params = {}, _secrets = {}, _context = {}) + raise ArgumentError, "Invalid resource" unless resource&.start_with?(SCHEME_PREFIX) + + args = resource[SCHEME_OFFSET..].split + method = args.shift + + runner_context = {} + + # NOTE: this adds itself to the runner_context + AwesomeProcess.new(@queue, runner_context, method, :env => params, :params => args) + + runner_context + end + + def status!(runner_context) + # check if it has no output (i.e.: we think it is running) but it is not running + if !runner_context.key?("Output") && !runner_context["thread"]&.alive? + runner_context["Output"] = {"Error" => "Lambda.Unknown", "Cause" => "no output and no thread"} + runner_context["Error"] = true + end + end + + def running?(runner_context) + !runner_context["Output"] + end + + def success?(runner_context) + !runner_context["Error"] + end + + def output(runner_context) + runner_context["Output"] + end + + def cleanup(runner_context) + runner_context["thread"] = nil + end + + def wait(timeout: nil, _events: %i[create update delete]) + # TODO: implement whole interface + raise "wait needs a block and doesn't support timeout" unless timeout.nil? && block_given? + + loop do + event_context = @queue.pop + yield event_context if block_given? + end + end + + # internal methods + + def self.command_error_cause(command_result) + command_result.error.nil? || command_result.error.empty? ? command_result.output.to_s : command_result.error.to_s + end + + def self.populate_results!(runner_context, result: nil, error: nil) + error ||= command_error_cause(result) if result&.failure? + + if error + runner_context["Output"] = {"Error" => "States.TaskFailed", "Cause" => error} + runner_context["Error"] = true + else + runner_context["Output"] = {"Result" => result.output.chomp.split("\n")} + end + + runner_context + end + end +end + +Floe::Runner.register_scheme(Floe::AwesomeRunner::SCHEME, Floe::AwesomeRunner.new) diff --git a/lib/floe/cli.rb b/lib/floe/cli.rb index 1ee91005..80f4e9f1 100644 --- a/lib/floe/cli.rb +++ b/lib/floe/cli.rb @@ -1,5 +1,6 @@ require "floe" require "floe/container_runner" +require "floe/awesome_runner" module Floe class CLI diff --git a/lib/floe/runner.rb b/lib/floe/runner.rb index 3e014bcd..2aae6e0d 100644 --- a/lib/floe/runner.rb +++ b/lib/floe/runner.rb @@ -25,6 +25,10 @@ def for_resource(resource) scheme = resource.split("://").first resolve_scheme(scheme) || raise(ArgumentError, "Invalid resource scheme [#{scheme}]") end + + def runners + @runners.each_value.map { |runner| runner.kind_of?(Proc) ? runner.call : runner } + end end # Run a command asynchronously and create a runner_context diff --git a/lib/floe/workflow.rb b/lib/floe/workflow.rb index 634a5084..f7b0c1ce 100644 --- a/lib/floe/workflow.rb +++ b/lib/floe/workflow.rb @@ -19,16 +19,21 @@ def load(path_or_io, context = nil, credentials = nil, name = nil) def wait(workflows, timeout: nil, &block) workflows = [workflows] if workflows.kind_of?(self) - run_until = Time.now.utc + timeout if timeout.to_i > 0 - ready = [] - queue = Queue.new - wait_thread = Thread.new do - loop do - Runner.for_resource("docker").wait do |event, runner_context| - queue.push([event, runner_context]) + run_until = Time.now.utc + timeout if timeout.to_i > 0 + ready = [] + queue = Queue.new + wait_threads = + Runner.runners.map do |runner| + next unless runner.respond_to?(:wait) + + Thread.new do + loop do + runner.wait do |event, runner_context| + queue.push([event, runner_context]) + end + end end end - end loop do ready = workflows.select(&:step_nonblock_ready?) @@ -73,7 +78,7 @@ def wait(workflows, timeout: nil, &block) ready ensure - wait_thread&.kill + wait_threads.compact.map(&:kill) end end diff --git a/spec/awesome_runner_spec.rb b/spec/awesome_runner_spec.rb new file mode 100644 index 00000000..aafedc32 --- /dev/null +++ b/spec/awesome_runner_spec.rb @@ -0,0 +1,80 @@ +require_relative "../lib/floe/awesome_runner" + +RSpec.describe Floe::AwesomeRunner, :uses_awesome_spawn => true do + let(:subject) { described_class.new(runner_options) } + let(:runner_options) { {} } + let(:container_id) { SecureRandom.hex } + + # let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Parameters" => {"var1.$" => "$.foo.bar"}, "End" => true}}) } + + describe "#run_async!" do + it "raises an exception without a resource" do + expect { subject.run_async!(nil) }.to raise_error(ArgumentError, "Invalid resource") + end + + it "raises an exception for an invalid resource uri" do + expect { subject.run_async!("arn:abcd:efgh") }.to raise_error(ArgumentError, "Invalid resource") + end + + it "calls command run with the command name" do + stub_good_run("ls", :params => [], :env => {}, :output => "file\nlisting\n") + + subject.run_async!("awesome://ls") + subject.queue.pop + end + + it "passes environment variables to command run" do + stub_good_run("ls", :params => [], :env => {"FOO" => "BAR"}, :output => "file\nlisting\n") + + subject.run_async!("awesome://ls", {"FOO" => "BAR"}) + subject.queue.pop + end + end + + # describe "#status!" do + # let(:runner_context) { {"container_ref" => container_id} } + + # it "returns the updated container_state" do + # stub_good_run!("ls", :params => ["inspect", container_id], :output => "[{\"State\": {\"Running\": true}}]") + + # subject.status!(runner_context) + + # expect(runner_context).to include("container_state" => {"Running" => true}) + # end + # end + + describe "#running?" do + # it "retuns true when running" do + # runner_context = {"container_ref" => container_id, "container_state" => {"Running" => true}} + # expect(subject.running?(runner_context)).to be_truthy + # end + + # it "retuns false when not running" do + # runner_context = {"container_ref" => container_id, "container_state" => {"Running" => false, "ExitCode" => 0}} + # expect(subject.running?(runner_context)).to be_falsey + # end + end + + describe "#success?" do + # it "retuns true when successful" do + # runner_context = {"container_ref" => container_id, "container_state" => {"Running" => false, "ExitCode" => 0}} + # expect(subject.success?(runner_context)).to be_truthy + # end + + # it "retuns false when not successful" do + # runner_context = {"container_ref" => container_id, "container_state" => {"Running" => false, "ExitCode" => 1}} + # expect(subject.success?(runner_context)).to be_falsey + # end + end + + describe "#output" do + let(:runner_context) { {"Output" => ["output1", "output2"]} } + + it "returns log output" do + expect(subject.output(runner_context)).to eq(["output1", "output2"]) + end + end + + # describe "#cleanup" do + # end +end