From 079f91ec6026e64d56daa8ab4c857ba42e62a1ba Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 09:39:04 +0000 Subject: [PATCH 01/10] Update Gemfile comments --- Gemfile | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Gemfile b/Gemfile index ffbef05..8aa3899 100644 --- a/Gemfile +++ b/Gemfile @@ -8,7 +8,9 @@ gemspec gem "irb" gem "rake", "~> 13.0" gem "minitest", "~> 5.0" + +# bikeshed-proof linter and formatter. ref: https://github.com/standardrb/standard gem "standard", "~> 1.3" -# used for one benchmarking scripts +# Used for one of the benchmarking scripts gem "mini_magick" From 00200b6fe5541fbc7c230129c4107c748973f9dc Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 09:39:04 +0000 Subject: [PATCH 02/10] Ignore .github folder in gemspec --- rapidbelt.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rapidbelt.gemspec b/rapidbelt.gemspec index 20e2009..161d5a6 100644 --- a/rapidbelt.gemspec +++ b/rapidbelt.gemspec @@ -25,7 +25,7 @@ Gem::Specification.new do |spec| spec.files = IO.popen(%w[git ls-files -z], chdir: __dir__, err: IO::NULL) do |ls| ls.readlines("\x0", chomp: true).reject do |f| (f == gemspec) || - f.start_with?(*%w[bin/ Gemfile .gitignore .standard.yml]) + f.start_with?(*%w[bin/ .github/ Gemfile .gitignore .standard.yml]) end end spec.bindir = "bin" From ae63d053ec606a44011a072a4a1023ee98b28ffa Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 09:39:04 +0000 Subject: [PATCH 03/10] Rename the gemspec file to rapidflow Originally gem was called rapidbelt --- rapidbelt.gemspec => rapidflow.gemspec | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename rapidbelt.gemspec => rapidflow.gemspec (100%) diff --git a/rapidbelt.gemspec b/rapidflow.gemspec similarity index 100% rename from rapidbelt.gemspec rename to rapidflow.gemspec From fa3ccb6ac8b5be7f10d092238cc85826ce7a5f05 Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 09:39:04 +0000 Subject: [PATCH 04/10] Add tests for Pipeline class --- test/rapidflow/pipeline_test.rb | 67 +++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 test/rapidflow/pipeline_test.rb diff --git a/test/rapidflow/pipeline_test.rb b/test/rapidflow/pipeline_test.rb new file mode 100644 index 0000000..78cf56c --- /dev/null +++ b/test/rapidflow/pipeline_test.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +require "test_helper" + +module Rapidflow + class PipelineTest < Minitest::Test + def test_empty_pipeline + pipeline = Pipeline.new(0, 1) + + pipeline.wait_for_completion + pipeline.shutdown + + assert pipeline.results_empty? + end + + def test_pipeline_with_single_stage + pipeline = Pipeline.new(1, 1) + + pipeline.enqueue(0, "test_item") + + # Simulate stage processing + item = pipeline.dequeue(0) + pipeline.enqueue(1, item.upcase) + pipeline.decrement_active_workers + + result = pipeline.dequeue_result + + assert_equal "TEST_ITEM", result + + pipeline.shutdown + end + + def test_pipeline_queues_created_correctly + pipeline = Pipeline.new(3, 2) + + # Pipeline with 3 stages should have 4 queues (one per stage + results queue) + (0..3).each do |i| + pipeline.enqueue(i, "item_#{i}") + + result = pipeline.dequeue(i) + assert_equal "item_#{i}", result + end + + pipeline.shutdown + end + + def test_active_workers_tracking + pipeline = Pipeline.new(1, 1) + + pipeline.enqueue(0, "item1") + pipeline.enqueue(0, "item2") + + # Simulate processing + pipeline.dequeue(0) + pipeline.decrement_active_workers + + pipeline.dequeue(0) + pipeline.decrement_active_workers + + pipeline.wait_for_completion + + pipeline.shutdown + + assert pipeline.results_empty? + end + end +end From 2bec0e0de23dc5a60073a18d86f00c304fd0d9ea Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 09:39:04 +0000 Subject: [PATCH 05/10] Add tests for Stage class --- test/rapidflow/stage_test.rb | 110 +++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 test/rapidflow/stage_test.rb diff --git a/test/rapidflow/stage_test.rb b/test/rapidflow/stage_test.rb new file mode 100644 index 0000000..1420c3b --- /dev/null +++ b/test/rapidflow/stage_test.rb @@ -0,0 +1,110 @@ +# frozen_string_literal: true + +require "test_helper" + +module Rapidflow + class StageTest < Minitest::Test + def test_stage_processes_single_item + pipeline = Pipeline.new(1, 1) + work_item = WorkItem.new(data: 5) + + stage = Stage.new( + stage_index: 0, + lambda_fn: ->(data) { data * 2 }, + workers: 1, + is_final: true, + pipeline: pipeline + ) + + stage.start + pipeline.enqueue(0, work_item) + + result = pipeline.dequeue_result + pipeline.shutdown + + assert_equal 10, result.data + refute result.has_error? + end + + def test_stage_handles_errors + pipeline = Pipeline.new(1, 1) + work_item = WorkItem.new(data: "test") + + stage = Stage.new( + stage_index: 0, + lambda_fn: ->(_data) { raise StandardError, "Processing error" }, + workers: 1, + is_final: true, + pipeline: pipeline + ) + + stage.start + pipeline.enqueue(0, work_item) + + result = pipeline.dequeue_result + pipeline.shutdown + + assert_equal "test", result.data + assert result.has_error? + assert_instance_of StandardError, result.error + assert_equal "Processing error", result.error.message + end + + def test_stage_with_multiple_workers + pipeline = Pipeline.new(1, 3) + items = 10.times.map { |i| WorkItem.new(data: i) } + + stage = Stage.new( + stage_index: 0, + lambda_fn: ->(data) { data * 2 }, + workers: 3, + is_final: true, + pipeline: pipeline + ) + + stage.start + items.each { |item| pipeline.enqueue(0, item) } + + results = [] + 10.times { results << pipeline.dequeue_result } + pipeline.shutdown + + assert_equal 10, results.length + results.each do |result| + refute result.has_error? + assert_includes (0..9).map { |i| i * 2 }, result.data + end + end + + def test_stage_forwards_to_next_stage + pipeline = Pipeline.new(2, 1) + work_item = WorkItem.new(data: "hello") + + stage1 = Stage.new( + stage_index: 0, + lambda_fn: ->(data) { data.upcase }, + workers: 1, + is_final: false, + pipeline: pipeline + ) + + stage2 = Stage.new( + stage_index: 1, + lambda_fn: ->(data) { data + "!" }, + workers: 1, + is_final: true, + pipeline: pipeline + ) + + stage1.start + stage2.start + pipeline.enqueue(0, work_item) + + result = pipeline.dequeue_result + pipeline.shutdown + + assert_equal "HELLO!", result.data + refute result.has_error? + end + end +end From 94d085ddaec0a634dfecaa0dc41259799ecaf080 Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 09:39:04 +0000 Subject: [PATCH 06/10] Rename module `Rapidflow` to `RapidFlow` --- CHANGELOG.md | 2 + README.md | 52 +++++++++---------- lib/rapidflow.rb | 2 +- lib/rapidflow/batch.rb | 2 +- lib/rapidflow/counter.rb | 2 +- lib/rapidflow/pipeline.rb | 2 +- lib/rapidflow/stage.rb | 2 +- lib/rapidflow/version.rb | 2 +- lib/rapidflow/work_item.rb | 2 +- rapidflow.gemspec | 2 +- ...nchmark_api_request_process_and_storing.rb | 22 ++++---- scripts/benchmark/benchmark_images.rb | 12 ++--- .../benchmark/simulated_data_processing.rb | 12 ++--- sig/rapidflow.rbs | 2 +- test/rapidflow/batch_test.rb | 2 +- test/rapidflow/counter_test.rb | 2 +- test/rapidflow/pipeline_test.rb | 2 +- test/rapidflow/stage_test.rb | 2 +- test/rapidflow/work_item_test.rb | 2 +- 19 files changed, 65 insertions(+), 63 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 463534e..bd456e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- Rename module `Rapidflow` to `RapidFlow`. + ## [0.1.0] - 2025.11.01 ### Added diff --git a/README.md b/README.md index fe766b2..d05007b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# 🌊 Rapidflow +# 🌊 RapidFlow ⚙️💎➡️📦💨🔁🌊 > A Ruby library for concurrent batch data processing through lightweight, composable flows. @@ -9,7 +9,7 @@ > **⚠️ Early Development Warning**: This library is at a very early stage of development. The interfaces and APIs > may change without backward compatibility guarantees. Use in production at your own risk. -Rapidflow is a lightweight, concurrent pipeline processor for Ruby that transforms data through multiple stages like +RapidFlow is a lightweight, concurrent pipeline processor for Ruby that transforms data through multiple stages like items moving through stages in a rapid flow. Perfect for I/O-bound operations like web scraping, API calls, and data processing. @@ -55,7 +55,7 @@ Create a batch instance. require 'rapidflow' # Create a 3-stage processing batch. Workers can be configured per stage basis or will use the default amount if omitted. -scraper = Rapidflow::Batch.build do +scraper = RapidFlow::Batch.build do stage ->(url) { fetch_html(url) }, workers: 8 # Stage 1: Fetch HTML stage ->(html) { parse_data(html) }, workers: 2 # Stage 2: Parse data stage ->(data) { save_to_db(data) } # Stage 3: Save to a database @@ -65,7 +65,7 @@ end Alternatively, you can also initialize the batch with the following syntax: ```ruby -batch = Rapidflow::Batch.new( +batch = RapidFlow::Batch.new( { fn: ->(url) { fetch_html(url) }, workers: 8 }, # Stage 1: Fetch HTML. { fn: ->(html) { parse_data(html) }, workers: 2 }, # Stage 2: Parse data { fn: ->(data) { save_to_db(data) } } # Stage 3: Save to database @@ -107,7 +107,7 @@ end ### Web Scraping Pipeline ```ruby -scraper = Rapidflow::Batch.build do +scraper = RapidFlow::Batch.build do stage ->(url) { # Fetch HTML (may take 1-2 seconds per URL) HTTP.get(url).to_s @@ -137,7 +137,7 @@ results = scraper.results ### Image Processing Pipeline ```ruby -processor = Rapidflow::Batch.build do +processor = RapidFlow::Batch.build do stage ->(path) { MiniMagick::Image.open(path) }, workers: 4 # Stage 1: Load image stage ->(img) { img.resize('800x600'); img }, workers: 4 # Stage 2: Resize stage ->(img) { img.colorspace('Gray'); img }, workers: 4 # Stage 3: Convert to grayscale @@ -153,7 +153,7 @@ puts "Processed #{results.count { |_, err| err.nil? }} images successfully" ### API Data Enrichment ```ruby -enricher = Rapidflow::Batch.build do +enricher = RapidFlow::Batch.build do stage ->(user_id) { # Fetch user data from API api_client.get("/users/#{user_id}").parse @@ -182,7 +182,7 @@ enriched_users = enricher.results ```ruby # Extract, Transform, Load -etl = Rapidflow::Batch.build do +etl = RapidFlow::Batch.build do stage ->(filename) { # Extract: Read CSV file CSV.read(filename, headers: true).map(&:to_h) @@ -212,7 +212,7 @@ puts "Loaded #{total_records} records" ```ruby # Sometimes you just need parallel processing without multiple stages # Fetch 20 URLs concurrently -fetcher = Rapidflow::Batch.new({ fn: ->(url) { HTTP.get(url).body }, workers: 20 }) +fetcher = RapidFlow::Batch.new({ fn: ->(url) { HTTP.get(url).body }, workers: 20 }) urls.each { |url| fetcher.push(url) } pages = fetcher.results @@ -220,10 +220,10 @@ pages = fetcher.results ## Error Handling -Rapidflow captures exceptions without stopping the pipeline: +RapidFlow captures exceptions without stopping the pipeline: ```ruby -batch = Rapidflow::Batch.new( +batch = RapidFlow::Batch.new( { fn: ->(url) { HTTP.get(url).body } }, # May raise network errors { fn: ->(body) { JSON.parse(body) } } # May raise JSON parse errors ) @@ -250,7 +250,7 @@ end ## Architecture -Rapidflow uses a multi-stage pipeline architecture with concurrent workers at each stage. +RapidFlow uses a multi-stage pipeline architecture with concurrent workers at each stage. ### Pipeline Flow @@ -354,10 +354,10 @@ Choose based on your workload: ```ruby # High I/O workload - many workers -Rapidflow::Batch.new({ fn: lambda1, workers: 100 }, { fn: lambda2, workers: 50 }) +RapidFlow::Batch.new({ fn: lambda1, workers: 100 }, { fn: lambda2, workers: 50 }) # CPU-intensive - fewer workers -Rapidflow::Batch.new({ fn: lambda1, workers: 2 }, { fn: lambda2, workers: 2 }) +RapidFlow::Batch.new({ fn: lambda1, workers: 2 }, { fn: lambda2, workers: 2 }) ``` ### Balancing Workers for Stages @@ -366,7 +366,7 @@ For the best throughput, workers should be assigned based on the I/O-bound workl ```ruby # ❌ Same number of workers even though stages have different I/O duration -Rapidflow::Batch.build do +RapidFlow::Batch.build do stage ->(x) { sleep(10); x }, workers: 4 # 10 seconds - SLOW! (Assume a heavy or long-running I/O task) stage ->(x) { sleep(0.1); x }, workers: 4 # 0.1 seconds - fast stage ->(x) { sleep(0.1); x }, workers: 4 # 0.1 seconds - fast @@ -374,7 +374,7 @@ Rapidflow::Batch.build do end # ✅ Balanced - workers are assigned based of I/O load -Rapidflow::Batch.build do +RapidFlow::Batch.build do stage ->(x) { sleep(10); x }, workers: 16 # 10 seconds - SLOW! stage ->(x) { sleep(0.1); x }, workers: 2 # 0.1 seconds - fast stage ->(x) { sleep(0.1); x }, workers: 2 # 0.1 seconds - fast @@ -404,11 +404,11 @@ end - Share mutable state between workers without synchronization - Push millions of items without processing results (memory issue) - Create dependencies between items (order of execution not guaranteed) -- Nest Rapidflow instances (use a single multi-stage batch instead) +- Nest RapidFlow instances (use a single multi-stage batch instead) ## Comparison with Alternatives -| Feature | Rapidflow | Thread Pool | Sidekiq | Concurrent-Ruby | +| Feature | RapidFlow | Thread Pool | Sidekiq | Concurrent-Ruby | |--------------------------|-----------|---------------|----------------|-----------------| | **Multi-stage pipeline** | ✅ | ❌ | ⚠️ (manual) | ❌ | | **Order preservation** | ✅ | ❌ | ❌ | ❌ | @@ -425,13 +425,13 @@ The following result is taken from a benchmark run of [./scripts/benchmark/bench ```bash /scripts/benchmark$ ruby benchmark_api_request_process_and_storing.rb 40 32 ================================================================================ -Rapidflow API Request, Process & Store Benchmark +RapidFlow API Request, Process & Store Benchmark ================================================================================ Configuration: API: dummyjson.com User IDs to process: 1 to 40 - Workers per stage (Rapidflow): 32 + Workers per stage (RapidFlow): 32 Stages: Fetch User → Fetch Product → Merge Data → Save to File Processing 40 user IDs... @@ -448,7 +448,7 @@ Results: 40 successful, 0 failed 2. RAPIDFLOW CONCURRENT PROCESSING -------------------------------------------------------------------------------- user system total real -Rapidflow (32 workers): 0.217776 0.084002 0.301778 ( 0.612455) +RapidFlow (32 workers): 0.217776 0.084002 0.301778 ( 0.612455) Results: 40 successful, 0 failed @@ -457,7 +457,7 @@ SUMMARY ================================================================================ Synchronous time: 13.18s -Rapidflow time: 0.61s +RapidFlow time: 0.61s Speedup: 21.52x faster Time saved: 12.57s @@ -467,7 +467,7 @@ Performance gain: 2052.1% FILE VERIFICATION -------------------------------------------------------------------------------- Synchronous output: 40 files created -Rapidflow output: 40 files created +RapidFlow output: 40 files created Sample output file: data_1.json User ID: 1 @@ -482,11 +482,11 @@ PERFORMANCE ANALYSIS Average time per item: Synchronous: 329.51ms - Rapidflow: 15.31ms + RapidFlow: 15.31ms Throughput (items/second): Synchronous: 3.03 items/sec - Rapidflow: 65.31 items/sec + RapidFlow: 65.31 items/sec ``` ## Development @@ -506,7 +506,7 @@ to be a safe, welcoming space for collaboration, and contributors are expected t ## Code of Conduct -Everyone interacting in the Rapidflow project's codebases, issue trackers, chat rooms and mailing lists is expected +Everyone interacting in the RapidFlow project's codebases, issue trackers, chat rooms and mailing lists is expected to follow the [code of conduct](https://github.com/sinaru/rapidflow/blob/main/CODE_OF_CONDUCT.md). ## License diff --git a/lib/rapidflow.rb b/lib/rapidflow.rb index a6c4fbc..c9374d0 100644 --- a/lib/rapidflow.rb +++ b/lib/rapidflow.rb @@ -7,5 +7,5 @@ require_relative "rapidflow/stage" require_relative "rapidflow/batch" -module Rapidflow +module RapidFlow end diff --git a/lib/rapidflow/batch.rb b/lib/rapidflow/batch.rb index 1987eaf..98d877f 100644 --- a/lib/rapidflow/batch.rb +++ b/lib/rapidflow/batch.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -module Rapidflow +module RapidFlow class Batch class ConfigError < RuntimeError; end class RunError < RuntimeError; end diff --git a/lib/rapidflow/counter.rb b/lib/rapidflow/counter.rb index 11ab520..16cf7f9 100644 --- a/lib/rapidflow/counter.rb +++ b/lib/rapidflow/counter.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -module Rapidflow +module RapidFlow # Tracks item indices for ordering class Counter def initialize diff --git a/lib/rapidflow/pipeline.rb b/lib/rapidflow/pipeline.rb index c4015f9..c2bc53f 100644 --- a/lib/rapidflow/pipeline.rb +++ b/lib/rapidflow/pipeline.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -module Rapidflow +module RapidFlow # Manages the queues and worker coordination class Pipeline # workers_per_stage can be an Integer (uniform) or an Array per stage diff --git a/lib/rapidflow/stage.rb b/lib/rapidflow/stage.rb index e6d9073..cf209d1 100644 --- a/lib/rapidflow/stage.rb +++ b/lib/rapidflow/stage.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -module Rapidflow +module RapidFlow # Represents a processing stage in the pipeline class Stage def initialize(stage_index:, lambda_fn:, workers:, is_final:, pipeline:) diff --git a/lib/rapidflow/version.rb b/lib/rapidflow/version.rb index 7ac7cce..71e3bf3 100644 --- a/lib/rapidflow/version.rb +++ b/lib/rapidflow/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true -module Rapidflow +module RapidFlow VERSION = "0.1.0" end diff --git a/lib/rapidflow/work_item.rb b/lib/rapidflow/work_item.rb index 58f3577..f016d70 100644 --- a/lib/rapidflow/work_item.rb +++ b/lib/rapidflow/work_item.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -module Rapidflow +module RapidFlow # Represents a work item flowing through the pipeline WorkItem = Struct.new('WorkItem', :index, :data, :error) do def has_error? diff --git a/rapidflow.gemspec b/rapidflow.gemspec index 161d5a6..ef7b85f 100644 --- a/rapidflow.gemspec +++ b/rapidflow.gemspec @@ -4,7 +4,7 @@ require_relative "lib/rapidflow/version" Gem::Specification.new do |spec| spec.name = "rapidflow" - spec.version = Rapidflow::VERSION + spec.version = RapidFlow::VERSION spec.authors = ["Sinaru Gunawardena"] spec.email = ["sinarug@gmail.com"] diff --git a/scripts/benchmark/benchmark_api_request_process_and_storing.rb b/scripts/benchmark/benchmark_api_request_process_and_storing.rb index 3a75688..ef2c89a 100644 --- a/scripts/benchmark/benchmark_api_request_process_and_storing.rb +++ b/scripts/benchmark/benchmark_api_request_process_and_storing.rb @@ -82,11 +82,11 @@ def process_data_synchronously(user_ids, output_dir) results end -# Solution 2: Rapidflow concurrent processing +# Solution 2: RapidFlow concurrent processing def process_data_with_rapidflow(user_ids, output_dir, workers: 8) FileUtils.mkdir_p(output_dir) - belt = Rapidflow::Batch.build do + belt = RapidFlow::Batch.build do # Stage 1: Fetch user data from API stage ->(user_id) { ApiClient.fetch_user(user_id) @@ -119,13 +119,13 @@ def process_data_with_rapidflow(user_ids, output_dir, workers: 8) # Run benchmark def run_benchmark(max_user_id: 30, workers: 8) puts "=" * 80 - puts "Rapidflow API Request, Process & Store Benchmark" + puts "RapidFlow API Request, Process & Store Benchmark" puts "=" * 80 puts puts "Configuration:" puts " API: dummyjson.com" puts " User IDs to process: 1 to #{max_user_id}" - puts " Workers per stage (Rapidflow): #{workers}" + puts " Workers per stage (RapidFlow): #{workers}" puts " Stages: Fetch User → Fetch Product → Merge Data → Save to File" puts @@ -167,7 +167,7 @@ def run_benchmark(max_user_id: 30, workers: 8) end puts - # Benchmark Rapidflow + # Benchmark RapidFlow puts "-" * 80 puts "2. RAPIDFLOW CONCURRENT PROCESSING" puts "-" * 80 @@ -176,7 +176,7 @@ def run_benchmark(max_user_id: 30, workers: 8) rapidflow_results = nil Benchmark.bm(30) do |x| - rapidflow_time = x.report("Rapidflow (#{workers} workers):") do + rapidflow_time = x.report("RapidFlow (#{workers} workers):") do rapidflow_results = process_data_with_rapidflow(user_ids, "tmp/output_rapidflow", workers: workers) end end @@ -210,7 +210,7 @@ def run_benchmark(max_user_id: 30, workers: 8) puts "=" * 80 puts puts "Synchronous time: #{sync_real_time.round(2)}s" - puts "Rapidflow time: #{rapidflow_real_time.round(2)}s" + puts "RapidFlow time: #{rapidflow_real_time.round(2)}s" puts puts "Speedup: #{speedup.round(2)}x faster" puts "Time saved: #{time_saved.round(2)}s" @@ -226,7 +226,7 @@ def run_benchmark(max_user_id: 30, workers: 8) rapidflow_files = Dir.glob("tmp/output_rapidflow/data_*.json").length puts "Synchronous output: #{sync_files} files created" - puts "Rapidflow output: #{rapidflow_files} files created" + puts "RapidFlow output: #{rapidflow_files} files created" puts # Sample file content verification @@ -256,7 +256,7 @@ def run_benchmark(max_user_id: 30, workers: 8) puts "Average time per item:" puts " Synchronous: #{(avg_time_per_item_sync * 1000).round(2)}ms" - puts " Rapidflow: #{(avg_time_per_item_rapid * 1000).round(2)}ms" + puts " RapidFlow: #{(avg_time_per_item_rapid * 1000).round(2)}ms" puts throughput_sync = max_user_id / sync_real_time @@ -264,7 +264,7 @@ def run_benchmark(max_user_id: 30, workers: 8) puts "Throughput (items/second):" puts " Synchronous: #{throughput_sync.round(2)} items/sec" - puts " Rapidflow: #{throughput_rapid.round(2)} items/sec" + puts " RapidFlow: #{throughput_rapid.round(2)} items/sec" puts # Cleanup prompt @@ -273,7 +273,7 @@ def run_benchmark(max_user_id: 30, workers: 8) puts "-" * 80 puts puts "Synchronous files: tmp/output_sync/" - puts "Rapidflow files: tmp/output_rapidflow/" + puts "RapidFlow files: tmp/output_rapidflow/" puts puts "To clean up output directories, run:" puts " rm -rf tmp/output_sync tmp/output_rapidflow" diff --git a/scripts/benchmark/benchmark_images.rb b/scripts/benchmark/benchmark_images.rb index 24b3bd1..987ccb9 100644 --- a/scripts/benchmark/benchmark_images.rb +++ b/scripts/benchmark/benchmark_images.rb @@ -89,11 +89,11 @@ def process_images_synchronously(image_paths, output_dir) results end -# Solution 2: Rapidflow concurrent processing +# Solution 2: RapidFlow concurrent processing def process_images_with_rapidflow(image_paths, output_dir, workers: 4) FileUtils.mkdir_p(output_dir) - belt = Rapidflow::Batch.build do + belt = RapidFlow::Batch.build do # Stage 1: Load image stage ->(path) { ImageProcessor.load_image(path) }, workers: workers @@ -114,7 +114,7 @@ def process_images_with_rapidflow(image_paths, output_dir, workers: 4) # Run benchmark def run_benchmark(sample_image_path, image_count: 50, workers: 4) puts "=" * 80 - puts "Rapidflow Image Processing Benchmark" + puts "RapidFlow Image Processing Benchmark" puts "=" * 80 puts puts "Configuration:" @@ -157,7 +157,7 @@ def run_benchmark(sample_image_path, image_count: 50, workers: 4) end puts - # Benchmark Rapidflow + # Benchmark RapidFlow puts "-" * 80 puts "2. RAPIDFLOW CONCURRENT PROCESSING" puts "-" * 80 @@ -166,7 +166,7 @@ def run_benchmark(sample_image_path, image_count: 50, workers: 4) rapidflow_results = nil Benchmark.bm(30) do |x| - rapidflow_time = x.report("Rapidflow (#{workers} workers):") do + rapidflow_time = x.report("RapidFlow (#{workers} workers):") do rapidflow_results = process_images_with_rapidflow(image_paths, "tmp/output_rapidflow", workers: workers) end end @@ -191,7 +191,7 @@ def run_benchmark(sample_image_path, image_count: 50, workers: 4) puts "=" * 80 puts puts "Synchronous time: #{sync_real_time.round(2)}s" - puts "Rapidflow time: #{rapidflow_real_time.round(2)}s" + puts "RapidFlow time: #{rapidflow_real_time.round(2)}s" puts puts "Speedup: #{speedup.round(2)}x faster" puts "Time saved: #{time_saved.round(2)}s" diff --git a/scripts/benchmark/simulated_data_processing.rb b/scripts/benchmark/simulated_data_processing.rb index 96c3bf5..12fefa3 100644 --- a/scripts/benchmark/simulated_data_processing.rb +++ b/scripts/benchmark/simulated_data_processing.rb @@ -109,9 +109,9 @@ def process_data_synchronously(urls) results end -# Solution 2: Rapidflow concurrent processing +# Solution 2: RapidFlow concurrent processing def process_data_with_rapidflow(urls, workers: 4) - belt = Rapidflow::Batch.build do + belt = RapidFlow::Batch.build do stage ->(url) { DataProcessor.fetch_html(url) }, workers: workers # Station 1: Fetch HTML stage ->(html) { DataProcessor.parse_data(html) }, workers: workers # Station 2: Parse data stage ->(data) { DataProcessor.fetch_other_data(data) }, workers: workers # Station 3: Fetch other data @@ -126,7 +126,7 @@ def process_data_with_rapidflow(urls, workers: 4) # Run benchmark def run_benchmark(url_count: 50, workers: 4) puts "=" * 80 - puts "Rapidflow Data Processing Benchmark" + puts "RapidFlow Data Processing Benchmark" puts "=" * 80 puts puts "Configuration:" @@ -163,7 +163,7 @@ def run_benchmark(url_count: 50, workers: 4) puts "Results: #{sync_success} successful, #{sync_failed} failed" puts - # Benchmark Rapidflow + # Benchmark RapidFlow puts "-" * 80 puts "2. RAPIDFLOW CONCURRENT PROCESSING" puts "-" * 80 @@ -172,7 +172,7 @@ def run_benchmark(url_count: 50, workers: 4) rapidflow_results = nil Benchmark.bm(30) do |x| - rapidflow_time = x.report("Rapidflow (#{workers} workers):") do + rapidflow_time = x.report("RapidFlow (#{workers} workers):") do rapidflow_results = process_data_with_rapidflow(urls, workers: workers) end end @@ -197,7 +197,7 @@ def run_benchmark(url_count: 50, workers: 4) puts "=" * 80 puts puts "Synchronous time: #{sync_real_time.round(2)}s" - puts "Rapidflow time: #{rapidflow_real_time.round(2)}s" + puts "RapidFlow time: #{rapidflow_real_time.round(2)}s" puts puts "Speedup: #{speedup.round(2)}x faster" puts "Time saved: #{time_saved.round(2)}s" diff --git a/sig/rapidflow.rbs b/sig/rapidflow.rbs index fe9c292..2c1ccd1 100644 --- a/sig/rapidflow.rbs +++ b/sig/rapidflow.rbs @@ -1,4 +1,4 @@ -module Rapidflow +module RapidFlow VERSION: String # See the writing guide of rbs: https://github.com/ruby/rbs#guides end diff --git a/test/rapidflow/batch_test.rb b/test/rapidflow/batch_test.rb index 6a20906..e994e9b 100644 --- a/test/rapidflow/batch_test.rb +++ b/test/rapidflow/batch_test.rb @@ -1,6 +1,6 @@ require "test_helper" -module Rapidflow +module RapidFlow class BatchTest < Minitest::Test def test_basic_functionality_with_arg_tasks belt = Batch.new( diff --git a/test/rapidflow/counter_test.rb b/test/rapidflow/counter_test.rb index 6ca9f6e..26b9bdb 100644 --- a/test/rapidflow/counter_test.rb +++ b/test/rapidflow/counter_test.rb @@ -1,6 +1,6 @@ require "test_helper" -module Rapidflow +module RapidFlow class CounterTest < Minitest::Test def test_sequential_indices counter = Counter.new diff --git a/test/rapidflow/pipeline_test.rb b/test/rapidflow/pipeline_test.rb index 78cf56c..827c087 100644 --- a/test/rapidflow/pipeline_test.rb +++ b/test/rapidflow/pipeline_test.rb @@ -2,7 +2,7 @@ require "test_helper" -module Rapidflow +module RapidFlow class PipelineTest < Minitest::Test def test_empty_pipeline pipeline = Pipeline.new(0, 1) diff --git a/test/rapidflow/stage_test.rb b/test/rapidflow/stage_test.rb index 1420c3b..63e10ae 100644 --- a/test/rapidflow/stage_test.rb +++ b/test/rapidflow/stage_test.rb @@ -2,7 +2,7 @@ require "test_helper" -module Rapidflow +module RapidFlow class StageTest < Minitest::Test def test_stage_processes_single_item pipeline = Pipeline.new(1, 1) diff --git a/test/rapidflow/work_item_test.rb b/test/rapidflow/work_item_test.rb index d121d0b..947325e 100644 --- a/test/rapidflow/work_item_test.rb +++ b/test/rapidflow/work_item_test.rb @@ -1,6 +1,6 @@ require "test_helper" -module Rapidflow +module RapidFlow class WorkItemTest < Minitest::Test def test_default_initialization item = WorkItem.new From e982cc71864c501f00cc535d8f0d31f9db44853c Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 09:39:04 +0000 Subject: [PATCH 07/10] Extract batch builder class --- lib/rapidflow.rb | 1 + lib/rapidflow/batch.rb | 15 +-------------- lib/rapidflow/builder_builder.rb | 16 ++++++++++++++++ 3 files changed, 18 insertions(+), 14 deletions(-) create mode 100644 lib/rapidflow/builder_builder.rb diff --git a/lib/rapidflow.rb b/lib/rapidflow.rb index c9374d0..b5185f5 100644 --- a/lib/rapidflow.rb +++ b/lib/rapidflow.rb @@ -5,6 +5,7 @@ require_relative "rapidflow/pipeline" require_relative "rapidflow/work_item" require_relative "rapidflow/stage" +require_relative "rapidflow/builder_builder" require_relative "rapidflow/batch" module RapidFlow diff --git a/lib/rapidflow/batch.rb b/lib/rapidflow/batch.rb index 98d877f..d87cedd 100644 --- a/lib/rapidflow/batch.rb +++ b/lib/rapidflow/batch.rb @@ -7,7 +7,7 @@ class RunError < RuntimeError; end # DSL entrypoint def self.build(&block) - builder = Builder.new + builder = BatchBuilder.new builder.instance_eval(&block) if block belt = new(*builder.stages) belt.start @@ -57,19 +57,6 @@ def results private - # DSL builder - class Builder - attr_reader :stages - - def initialize - @stages = [] - end - - def stage(lambda_fn, workers: 4) - @stages << { fn: lambda_fn, workers: workers } - end - end - def build_stages stages = [] @lambdas.each_with_index do |lambda_fn, stage_index| diff --git a/lib/rapidflow/builder_builder.rb b/lib/rapidflow/builder_builder.rb new file mode 100644 index 0000000..dfa5c2a --- /dev/null +++ b/lib/rapidflow/builder_builder.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module RapidFlow + # DSL builder + class BatchBuilder + attr_reader :stages + + def initialize + @stages = [] + end + + def stage(lambda_fn, workers: 4) + @stages << { fn: lambda_fn, workers: workers } + end + end +end From e0df6decc51f5a12d17671b35d29d5b3b8333722 Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 09:39:04 +0000 Subject: [PATCH 08/10] Rename belt to batch --- lib/rapidflow/batch.rb | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/rapidflow/batch.rb b/lib/rapidflow/batch.rb index d87cedd..83e04c9 100644 --- a/lib/rapidflow/batch.rb +++ b/lib/rapidflow/batch.rb @@ -9,9 +9,9 @@ class RunError < RuntimeError; end def self.build(&block) builder = BatchBuilder.new builder.instance_eval(&block) if block - belt = new(*builder.stages) - belt.start - belt + batch = new(*builder.stages) + batch.start + batch end # Initialize with a list of stage configs: { fn: -> (input) { }, workers: Integer }, ... @@ -27,13 +27,13 @@ def initialize(*stage_configs) @locked = false @locked_mutex = Mutex.new - # to track if belt is running + # to track if batch is running @running = false @running_mutex = Mutex.new end def start - raise ConfigError, "Unable to start the belt without any stages" if @stages.empty? + raise ConfigError, "Unable to start the batch without any stages" if @stages.empty? @stages.each(&:start) mark_run! @@ -86,7 +86,7 @@ def finalize! def ensure_not_finalized! @locked_mutex.synchronize do - raise RunError, "Cannot push to a locked belt when results are requested" if @locked + raise RunError, "Cannot push to a locked batch when results are requested" if @locked end end From e3ff060795693083a22451392f5cfd25bf5f3ced Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 09:39:04 +0000 Subject: [PATCH 09/10] Fix error message check in tests --- test/rapidflow/batch_test.rb | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/test/rapidflow/batch_test.rb b/test/rapidflow/batch_test.rb index e994e9b..4cd5cce 100644 --- a/test/rapidflow/batch_test.rb +++ b/test/rapidflow/batch_test.rb @@ -39,18 +39,22 @@ def test_basic_functionality_with_build end def test_no_stages_with_build - assert_raises(Batch::ConfigError, "Unable to start the belt without any stages") do + error = assert_raises(Batch::ConfigError) do Batch.build do # no stages end end + + assert_equal "Unable to start the batch without any stages", error.message end def test_no_stages_belt_start - assert_raises(Batch::ConfigError, "Unable to start the belt without any stages") do + error = assert_raises(Batch::ConfigError) do belt = Batch.new belt.start end + + assert_equal "Unable to start the batch without any stages", error.message end def test_concurrent_execution_is_faster_than_sequential @@ -196,9 +200,9 @@ def test_cannot_push_after_results_called belt.push("item1") belt.results - assert_raises(Batch::RunError, "Cannot push to a locked belt when results are requested") do - belt.push("item2") - end + error = assert_raises(Batch::RunError) { belt.push("item2") } + + assert_equal "Cannot push to a locked batch when results are requested", error.message end def test_results_waits_for_all_processing_to_complete From 692f10b8eba2e21259da85e104f849adf38502c1 Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 09:47:01 +0000 Subject: [PATCH 10/10] Replace 'belt' to 'batch' --- test/rapidflow/batch_test.rb | 196 +++++++++++++++++------------------ 1 file changed, 98 insertions(+), 98 deletions(-) diff --git a/test/rapidflow/batch_test.rb b/test/rapidflow/batch_test.rb index 4cd5cce..e3ce7fc 100644 --- a/test/rapidflow/batch_test.rb +++ b/test/rapidflow/batch_test.rb @@ -3,16 +3,16 @@ module RapidFlow class BatchTest < Minitest::Test def test_basic_functionality_with_arg_tasks - belt = Batch.new( + batch = Batch.new( { fn: ->(data) { data.upcase }, workers: 4 }, { fn: ->(data) { data + "!" }, workers: 4 } ) - belt.start + batch.start - belt.push("hello") - belt.push("world") + batch.push("hello") + batch.push("world") - results = belt.results + results = batch.results assert_equal 2, results.length assert_equal ["HELLO!", nil], results[0] @@ -20,7 +20,7 @@ def test_basic_functionality_with_arg_tasks end def test_basic_functionality_with_build - belt = Batch.build do + batch = Batch.build do # first stage to up case string stage ->(data) { data.upcase } @@ -28,10 +28,10 @@ def test_basic_functionality_with_build stage ->(data) { data + "!" } end - belt.push("hello") - belt.push("world") + batch.push("hello") + batch.push("world") - results = belt.results + results = batch.results assert_equal 2, results.length assert_equal ["HELLO!", nil], results[0] @@ -48,10 +48,10 @@ def test_no_stages_with_build assert_equal "Unable to start the batch without any stages", error.message end - def test_no_stages_belt_start + def test_no_stages_batch_start error = assert_raises(Batch::ConfigError) do - belt = Batch.new - belt.start + batch = Batch.new + batch.start end assert_equal "Unable to start the batch without any stages", error.message @@ -63,7 +63,7 @@ def test_concurrent_execution_is_faster_than_sequential # - Sequential would take: 4 items * 0.5s * 2 stages = 4 seconds # - Concurrent (4 workers per stage) should take: max(0.5s, 0.5s) = ~0.5-1s - belt = Batch.build do + batch = Batch.build do stage ->(data) { sleep(0.5) data @@ -76,8 +76,8 @@ def test_concurrent_execution_is_faster_than_sequential start_time = Time.now - 4.times { |i| belt.push(i) } - results = belt.results + 4.times { |i| batch.push(i) } + results = batch.results elapsed = Time.now - start_time @@ -93,7 +93,7 @@ def test_parallel_processing_at_each_stage stage1_executing = [] stage2_executing = [] - belt = Batch.build do + batch = Batch.build do stage ->(data) { execution_tracker.synchronize { stage1_executing << data } sleep(0.3) @@ -109,7 +109,7 @@ def test_parallel_processing_at_each_stage end # Push multiple items quickly - 10.times { |i| belt.push(i) } + 10.times { |i| batch.push(i) } # Give threads time to start processing sleep(0.1) @@ -121,7 +121,7 @@ def test_parallel_processing_at_each_stage "Expected concurrent execution, but only #{stage1_executing.length} items processing" end - results = belt.results + results = batch.results assert_equal 10, results.length end @@ -129,7 +129,7 @@ def test_pipeline_stages_process_independently # Track execution order to verify pipeline behavior execution_log = Queue.new - belt = Batch.build do + batch = Batch.build do stage ->(data) { execution_log.push("stage1_start_#{data}") sleep(0.2) @@ -144,11 +144,11 @@ def test_pipeline_stages_process_independently } end - belt.push("A") + batch.push("A") sleep(0.1) # Let A start processing - belt.push("B") + batch.push("B") - belt.results + batch.results # Convert log to array log = [] @@ -167,7 +167,7 @@ def test_pipeline_stages_process_independently end def test_error_handling_captures_exceptions - belt = Batch.build do + batch = Batch.build do stage ->(data) { raise "Error in stage 1" if data == "bad" data @@ -175,10 +175,10 @@ def test_error_handling_captures_exceptions stage ->(data) { data.upcase } end - belt.push("good") - belt.push("bad") + batch.push("good") + batch.push("bad") - results = belt.results + results = batch.results assert_equal 2, results.length @@ -193,14 +193,14 @@ def test_error_handling_captures_exceptions end def test_cannot_push_after_results_called - belt = Batch.build do + batch = Batch.build do stage ->(data) { data } end - belt.push("item1") - belt.results + batch.push("item1") + batch.results - error = assert_raises(Batch::RunError) { belt.push("item2") } + error = assert_raises(Batch::RunError) { batch.push("item2") } assert_equal "Cannot push to a locked batch when results are requested", error.message end @@ -208,7 +208,7 @@ def test_cannot_push_after_results_called def test_results_waits_for_all_processing_to_complete completion_times = Queue.new - belt = Batch.build do + batch = Batch.build do stage ->(data) { sleep(0.5) data @@ -219,11 +219,11 @@ def test_results_waits_for_all_processing_to_complete } end - belt.push("item1") - belt.push("item2") + batch.push("item1") + batch.push("item2") Time.now - results = belt.results + results = batch.results results_end = Time.now # All items should have completed before results returns @@ -241,7 +241,7 @@ def test_results_waits_for_all_processing_to_complete def test_high_throughput_with_many_items item_count = 100 - belt = Batch.build do + batch = Batch.build do stage ->(data) { sleep(0.01) data * 2 @@ -253,8 +253,8 @@ def test_high_throughput_with_many_items end start_time = Time.now - item_count.times { |i| belt.push(i) } - results = belt.results + item_count.times { |i| batch.push(i) } + results = batch.results elapsed = Time.now - start_time assert_equal item_count, results.length @@ -270,7 +270,7 @@ def test_high_throughput_with_many_items end def test_three_stage_pipeline - belt = Batch.build do + batch = Batch.build do stage ->(data) { sleep(0.1) data.upcase @@ -285,10 +285,10 @@ def test_three_stage_pipeline } end - belt.push("hello") - belt.push("world") + batch.push("hello") + batch.push("world") - results = belt.results + results = batch.results assert_equal 2, results.length assert_equal ["HELLO!HELLO!", nil], results[0] @@ -297,7 +297,7 @@ def test_three_stage_pipeline def test_results_preserve_input_order # Even though items complete at different times, results should match push order - belt = Batch.build do + batch = Batch.build do stage ->(data) { # Make later items finish faster sleep_time = (data[:id] == 0) ? 0.5 : 0.1 @@ -308,9 +308,9 @@ def test_results_preserve_input_order # Push items in order 0, 1, 2, 3 # But item 0 will take longer to complete - 4.times { |i| belt.push({ id: i }) } + 4.times { |i| batch.push({ id: i }) } - results = belt.results + results = batch.results # Results should still be in order 0, 1, 2, 3 assert_equal 4, results.length @@ -321,14 +321,14 @@ def test_results_preserve_input_order end def test_single_stage_pipeline - belt = Batch.build do + batch = Batch.build do stage ->(data) { data * 2 } end - belt.push(5) - belt.push(10) + batch.push(5) + batch.push(10) - results = belt.results + results = batch.results assert_equal 2, results.length assert_equal [10, nil], results[0] @@ -336,15 +336,15 @@ def test_single_stage_pipeline end def test_empty_pipeline - belt = Batch.build { stage ->(_data) { } } + batch = Batch.build { stage ->(_data) { } } - results = belt.results + results = batch.results assert_equal 0, results.length end def test_error_in_middle_stage - belt = Batch.build do + batch = Batch.build do stage ->(data) { data.upcase } stage ->(data) { raise "Error in stage 2" if data == "BAD" @@ -353,11 +353,11 @@ def test_error_in_middle_stage stage ->(data) { data + "!" } end - belt.push("good") - belt.push("bad") - belt.push("also_good") + batch.push("good") + batch.push("bad") + batch.push("also_good") - results = belt.results + results = batch.results assert_equal 3, results.length assert_equal ["GOOD!", nil], results[0] @@ -367,7 +367,7 @@ def test_error_in_middle_stage end def test_error_in_last_stage - belt = Batch.build do + batch = Batch.build do stage ->(data) { data.upcase } stage ->(data) { raise "Error in final stage" if data == "BAD" @@ -375,10 +375,10 @@ def test_error_in_last_stage } end - belt.push("good") - belt.push("bad") + batch.push("good") + batch.push("bad") - results = belt.results + results = batch.results assert_equal 2, results.length assert_equal ["GOOD", nil], results[0] @@ -387,19 +387,19 @@ def test_error_in_last_stage end def test_multiple_errors_in_sequence - belt = Batch.build do + batch = Batch.build do stage ->(data) { raise "Error at #{data}" if data.start_with?("bad") data } end - belt.push("good1") - belt.push("bad1") - belt.push("bad2") - belt.push("good2") + batch.push("good1") + batch.push("bad1") + batch.push("bad2") + batch.push("good2") - results = belt.results + results = batch.results assert_equal 4, results.length assert_equal ["good1", nil], results[0] @@ -435,15 +435,15 @@ def test_different_worker_counts end def test_complex_data_types - belt = Batch.build do + batch = Batch.build do stage ->(data) { { original: data, processed: true } } stage ->(data) { data.merge(stage2: Time.now.to_i) } end - belt.push({ id: 1, name: "test" }) - belt.push([1, 2, 3]) + batch.push({ id: 1, name: "test" }) + batch.push([1, 2, 3]) - results = belt.results + results = batch.results assert_equal 2, results.length assert results[0][0].is_a?(Hash) @@ -453,15 +453,15 @@ def test_complex_data_types end def test_nil_values - belt = Batch.build do + batch = Batch.build do stage ->(data) { data.nil? ? "was_nil" : data } stage ->(data) { data.upcase } end - belt.push(nil) - belt.push("hello") + batch.push(nil) + batch.push("hello") - results = belt.results + results = batch.results assert_equal 2, results.length assert_equal ["WAS_NIL", nil], results[0] @@ -471,15 +471,15 @@ def test_nil_values def test_large_dataset_stress_test item_count = 500 - belt = Batch.build do + batch = Batch.build do stage ->(data) { data * 2 }, workers: 8 stage ->(data) { data + 1 }, workers: 8 stage ->(data) { data.to_s }, workers: 8 end - item_count.times { |i| belt.push(i) } + item_count.times { |i| batch.push(i) } - results = belt.results + results = batch.results assert_equal item_count, results.length @@ -492,7 +492,7 @@ def test_large_dataset_stress_test def test_varying_processing_times # Simulate real-world scenario with varying processing times - belt = Batch.build do + batch = Batch.build do stage ->(data) { sleep(rand * 0.1) # Random 0-100ms data.upcase @@ -504,9 +504,9 @@ def test_varying_processing_times end words = %w[apple banana cherry date elderberry fig grape] - words.each { |word| belt.push(word) } + words.each { |word| batch.push(word) } - results = belt.results + results = batch.results assert_equal words.length, results.length words.each_with_index do |word, i| @@ -516,7 +516,7 @@ def test_varying_processing_times end def test_exception_types_preserved - belt = Batch.build do + batch = Batch.build do stage ->(data) { case data when "argument_error" @@ -531,12 +531,12 @@ def test_exception_types_preserved } end - belt.push("good") - belt.push("argument_error") - belt.push("runtime_error") - belt.push("custom_error") + batch.push("good") + batch.push("argument_error") + batch.push("runtime_error") + batch.push("custom_error") - results = belt.results + results = batch.results assert_equal 4, results.length assert_equal ["good", nil], results[0] @@ -546,13 +546,13 @@ def test_exception_types_preserved end def test_all_items_fail - belt = Batch.build do + batch = Batch.build do stage ->(data) { raise "Always fails" } end - 5.times { |i| belt.push(i) } + 5.times { |i| batch.push(i) } - results = belt.results + results = batch.results assert_equal 5, results.length results.each do |result, error| @@ -562,14 +562,14 @@ def test_all_items_fail end def test_push_many_items_quickly - belt = Batch.build do + batch = Batch.build do stage ->(data) { data } end # Push 1000 items as fast as possible - 1000.times { |i| belt.push(i) } + 1000.times { |i| batch.push(i) } - results = belt.results + results = batch.results assert_equal 1000, results.length # Verify order is maintained @@ -579,22 +579,22 @@ def test_push_many_items_quickly end def test_idempotent_results_calls_not_allowed - belt = Batch.build do + batch = Batch.build do stage ->(data) { data } end - belt.push(1) - belt.results + batch.push(1) + batch.results # Can't call results again or push again - assert_raises(RuntimeError) { belt.push(2) } + assert_raises(RuntimeError) { batch.push(2) } end def test_thread_safety_of_shared_state shared_counter = { count: 0 } mutex = Mutex.new - belt = Batch.build do + batch = Batch.build do stage ->(data) { # Safely increment shared counter mutex.synchronize { shared_counter[:count] += 1 } @@ -602,8 +602,8 @@ def test_thread_safety_of_shared_state }, workers: 10 end - 100.times { |i| belt.push(i) } - results = belt.results + 100.times { |i| batch.push(i) } + results = batch.results assert_equal 100, results.length assert_equal 100, shared_counter[:count]