Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Rename module `Rapidflow` to `RapidFlow`.

## [0.1.0] - 2025.11.01

### Added
Expand Down
4 changes: 3 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ gemspec
gem "irb"
gem "rake", "~> 13.0"
gem "minitest", "~> 5.0"

# bikeshed-proof linter and formatter. ref: https://github.com/standardrb/standard
gem "standard", "~> 1.3"

# used for one benchmarking scripts
# Used for one of the benchmarking scripts
gem "mini_magick"
52 changes: 26 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# 🌊 Rapidflow
# 🌊 RapidFlow

⚙️💎➡️📦💨🔁🌊
> A Ruby library for concurrent batch data processing through lightweight, composable flows.
Expand All @@ -9,7 +9,7 @@
> **⚠️ Early Development Warning**: This library is at a very early stage of development. The interfaces and APIs
> may change without backward compatibility guarantees. Use in production at your own risk.

Rapidflow is a lightweight, concurrent pipeline processor for Ruby that transforms data through multiple stages like
RapidFlow is a lightweight, concurrent pipeline processor for Ruby that transforms data through multiple stages like
items moving through stages in a rapid flow. Perfect for I/O-bound operations like web scraping, API calls,
and data processing.

Expand Down Expand Up @@ -55,7 +55,7 @@ Create a batch instance.
require 'rapidflow'

# Create a 3-stage processing batch. Workers can be configured per stage basis or will use the default amount if omitted.
scraper = Rapidflow::Batch.build do
scraper = RapidFlow::Batch.build do
stage ->(url) { fetch_html(url) }, workers: 8 # Stage 1: Fetch HTML
stage ->(html) { parse_data(html) }, workers: 2 # Stage 2: Parse data
stage ->(data) { save_to_db(data) } # Stage 3: Save to a database
Expand All @@ -65,7 +65,7 @@ end
Alternatively, you can also initialize the batch with the following syntax:

```ruby
batch = Rapidflow::Batch.new(
batch = RapidFlow::Batch.new(
{ fn: ->(url) { fetch_html(url) }, workers: 8 }, # Stage 1: Fetch HTML.
{ fn: ->(html) { parse_data(html) }, workers: 2 }, # Stage 2: Parse data
{ fn: ->(data) { save_to_db(data) } } # Stage 3: Save to database
Expand Down Expand Up @@ -107,7 +107,7 @@ end
### Web Scraping Pipeline

```ruby
scraper = Rapidflow::Batch.build do
scraper = RapidFlow::Batch.build do
stage ->(url) {
# Fetch HTML (may take 1-2 seconds per URL)
HTTP.get(url).to_s
Expand Down Expand Up @@ -137,7 +137,7 @@ results = scraper.results
### Image Processing Pipeline

```ruby
processor = Rapidflow::Batch.build do
processor = RapidFlow::Batch.build do
stage ->(path) { MiniMagick::Image.open(path) }, workers: 4 # Stage 1: Load image
stage ->(img) { img.resize('800x600'); img }, workers: 4 # Stage 2: Resize
stage ->(img) { img.colorspace('Gray'); img }, workers: 4 # Stage 3: Convert to grayscale
Expand All @@ -153,7 +153,7 @@ puts "Processed #{results.count { |_, err| err.nil? }} images successfully"
### API Data Enrichment

```ruby
enricher = Rapidflow::Batch.build do
enricher = RapidFlow::Batch.build do
stage ->(user_id) {
# Fetch user data from API
api_client.get("/users/#{user_id}").parse
Expand Down Expand Up @@ -182,7 +182,7 @@ enriched_users = enricher.results

```ruby
# Extract, Transform, Load
etl = Rapidflow::Batch.build do
etl = RapidFlow::Batch.build do
stage ->(filename) {
# Extract: Read CSV file
CSV.read(filename, headers: true).map(&:to_h)
Expand Down Expand Up @@ -212,18 +212,18 @@ puts "Loaded #{total_records} records"
```ruby
# Sometimes you just need parallel processing without multiple stages
# Fetch 20 URLs concurrently
fetcher = Rapidflow::Batch.new({ fn: ->(url) { HTTP.get(url).body }, workers: 20 })
fetcher = RapidFlow::Batch.new({ fn: ->(url) { HTTP.get(url).body }, workers: 20 })

urls.each { |url| fetcher.push(url) }
pages = fetcher.results
```

## Error Handling

Rapidflow captures exceptions without stopping the pipeline:
RapidFlow captures exceptions without stopping the pipeline:

```ruby
batch = Rapidflow::Batch.new(
batch = RapidFlow::Batch.new(
{ fn: ->(url) { HTTP.get(url).body } }, # May raise network errors
{ fn: ->(body) { JSON.parse(body) } } # May raise JSON parse errors
)
Expand All @@ -250,7 +250,7 @@ end

## Architecture

Rapidflow uses a multi-stage pipeline architecture with concurrent workers at each stage.
RapidFlow uses a multi-stage pipeline architecture with concurrent workers at each stage.

### Pipeline Flow

Expand Down Expand Up @@ -354,10 +354,10 @@ Choose based on your workload:

```ruby
# High I/O workload - many workers
Rapidflow::Batch.new({ fn: lambda1, workers: 100 }, { fn: lambda2, workers: 50 })
RapidFlow::Batch.new({ fn: lambda1, workers: 100 }, { fn: lambda2, workers: 50 })

# CPU-intensive - fewer workers
Rapidflow::Batch.new({ fn: lambda1, workers: 2 }, { fn: lambda2, workers: 2 })
RapidFlow::Batch.new({ fn: lambda1, workers: 2 }, { fn: lambda2, workers: 2 })
```

### Balancing Workers for Stages
Expand All @@ -366,15 +366,15 @@ For the best throughput, workers should be assigned based on the I/O-bound workl

```ruby
# ❌ Same number of workers even though stages have different I/O duration
Rapidflow::Batch.build do
RapidFlow::Batch.build do
stage ->(x) { sleep(10); x }, workers: 4 # 10 seconds - SLOW! (Assume a heavy or long-running I/O task)
stage ->(x) { sleep(0.1); x }, workers: 4 # 0.1 seconds - fast
stage ->(x) { sleep(0.1); x }, workers: 4 # 0.1 seconds - fast
stage ->(x) { x }, workers: 4 # No I/O bound work
end

# ✅ Balanced - workers are assigned based of I/O load
Rapidflow::Batch.build do
RapidFlow::Batch.build do
stage ->(x) { sleep(10); x }, workers: 16 # 10 seconds - SLOW!
stage ->(x) { sleep(0.1); x }, workers: 2 # 0.1 seconds - fast
stage ->(x) { sleep(0.1); x }, workers: 2 # 0.1 seconds - fast
Expand Down Expand Up @@ -404,11 +404,11 @@ end
- Share mutable state between workers without synchronization
- Push millions of items without processing results (memory issue)
- Create dependencies between items (order of execution not guaranteed)
- Nest Rapidflow instances (use a single multi-stage batch instead)
- Nest RapidFlow instances (use a single multi-stage batch instead)

## Comparison with Alternatives

| Feature | Rapidflow | Thread Pool | Sidekiq | Concurrent-Ruby |
| Feature | RapidFlow | Thread Pool | Sidekiq | Concurrent-Ruby |
|--------------------------|-----------|---------------|----------------|-----------------|
| **Multi-stage pipeline** | ✅ | ❌ | ⚠️ (manual) | ❌ |
| **Order preservation** | ✅ | ❌ | ❌ | ❌ |
Expand All @@ -425,13 +425,13 @@ The following result is taken from a benchmark run of [./scripts/benchmark/bench
```bash
/scripts/benchmark$ ruby benchmark_api_request_process_and_storing.rb 40 32
================================================================================
Rapidflow API Request, Process & Store Benchmark
RapidFlow API Request, Process & Store Benchmark
================================================================================

Configuration:
API: dummyjson.com
User IDs to process: 1 to 40
Workers per stage (Rapidflow): 32
Workers per stage (RapidFlow): 32
Stages: Fetch User → Fetch Product → Merge Data → Save to File

Processing 40 user IDs...
Expand All @@ -448,7 +448,7 @@ Results: 40 successful, 0 failed
2. RAPIDFLOW CONCURRENT PROCESSING
--------------------------------------------------------------------------------
user system total real
Rapidflow (32 workers): 0.217776 0.084002 0.301778 ( 0.612455)
RapidFlow (32 workers): 0.217776 0.084002 0.301778 ( 0.612455)

Results: 40 successful, 0 failed

Expand All @@ -457,7 +457,7 @@ SUMMARY
================================================================================

Synchronous time: 13.18s
Rapidflow time: 0.61s
RapidFlow time: 0.61s

Speedup: 21.52x faster
Time saved: 12.57s
Expand All @@ -467,7 +467,7 @@ Performance gain: 2052.1%
FILE VERIFICATION
--------------------------------------------------------------------------------
Synchronous output: 40 files created
Rapidflow output: 40 files created
RapidFlow output: 40 files created

Sample output file: data_1.json
User ID: 1
Expand All @@ -482,11 +482,11 @@ PERFORMANCE ANALYSIS

Average time per item:
Synchronous: 329.51ms
Rapidflow: 15.31ms
RapidFlow: 15.31ms

Throughput (items/second):
Synchronous: 3.03 items/sec
Rapidflow: 65.31 items/sec
RapidFlow: 65.31 items/sec
```

## Development
Expand All @@ -506,7 +506,7 @@ to be a safe, welcoming space for collaboration, and contributors are expected t

## Code of Conduct

Everyone interacting in the Rapidflow project's codebases, issue trackers, chat rooms and mailing lists is expected
Everyone interacting in the RapidFlow project's codebases, issue trackers, chat rooms and mailing lists is expected
to follow the [code of conduct](https://github.com/sinaru/rapidflow/blob/main/CODE_OF_CONDUCT.md).

## License
Expand Down
3 changes: 2 additions & 1 deletion lib/rapidflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
require_relative "rapidflow/pipeline"
require_relative "rapidflow/work_item"
require_relative "rapidflow/stage"
require_relative "rapidflow/builder_builder"
require_relative "rapidflow/batch"

module Rapidflow
module RapidFlow
end
29 changes: 8 additions & 21 deletions lib/rapidflow/batch.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# frozen_string_literal: true

module Rapidflow
module RapidFlow
class Batch
class ConfigError < RuntimeError; end
class RunError < RuntimeError; end

# DSL entrypoint
def self.build(&block)
builder = Builder.new
builder = BatchBuilder.new
builder.instance_eval(&block) if block
belt = new(*builder.stages)
belt.start
belt
batch = new(*builder.stages)
batch.start
batch
end

# Initialize with a list of stage configs: { fn: -> (input) { }, workers: Integer }, ...
Expand All @@ -27,13 +27,13 @@ def initialize(*stage_configs)
@locked = false
@locked_mutex = Mutex.new

# to track if belt is running
# to track if batch is running
@running = false
@running_mutex = Mutex.new
end

def start
raise ConfigError, "Unable to start the belt without any stages" if @stages.empty?
raise ConfigError, "Unable to start the batch without any stages" if @stages.empty?

@stages.each(&:start)
mark_run!
Expand All @@ -57,19 +57,6 @@ def results

private

# DSL builder
class Builder
attr_reader :stages

def initialize
@stages = []
end

def stage(lambda_fn, workers: 4)
@stages << { fn: lambda_fn, workers: workers }
end
end

def build_stages
stages = []
@lambdas.each_with_index do |lambda_fn, stage_index|
Expand Down Expand Up @@ -99,7 +86,7 @@ def finalize!

def ensure_not_finalized!
@locked_mutex.synchronize do
raise RunError, "Cannot push to a locked belt when results are requested" if @locked
raise RunError, "Cannot push to a locked batch when results are requested" if @locked
end
end

Expand Down
16 changes: 16 additions & 0 deletions lib/rapidflow/builder_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

module RapidFlow
# DSL builder
class BatchBuilder
attr_reader :stages

def initialize
@stages = []
end

def stage(lambda_fn, workers: 4)
@stages << { fn: lambda_fn, workers: workers }
end
end
end
2 changes: 1 addition & 1 deletion lib/rapidflow/counter.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

module Rapidflow
module RapidFlow
# Tracks item indices for ordering
class Counter
def initialize
Expand Down
2 changes: 1 addition & 1 deletion lib/rapidflow/pipeline.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

module Rapidflow
module RapidFlow
# Manages the queues and worker coordination
class Pipeline
# workers_per_stage can be an Integer (uniform) or an Array per stage
Expand Down
2 changes: 1 addition & 1 deletion lib/rapidflow/stage.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

module Rapidflow
module RapidFlow
# Represents a processing stage in the pipeline
class Stage
def initialize(stage_index:, lambda_fn:, workers:, is_final:, pipeline:)
Expand Down
2 changes: 1 addition & 1 deletion lib/rapidflow/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Rapidflow
module RapidFlow
VERSION = "0.1.0"
end
2 changes: 1 addition & 1 deletion lib/rapidflow/work_item.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

module Rapidflow
module RapidFlow
# Represents a work item flowing through the pipeline
WorkItem = Struct.new('WorkItem', :index, :data, :error) do
def has_error?
Expand Down
4 changes: 2 additions & 2 deletions rapidbelt.gemspec → rapidflow.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ require_relative "lib/rapidflow/version"

Gem::Specification.new do |spec|
spec.name = "rapidflow"
spec.version = Rapidflow::VERSION
spec.version = RapidFlow::VERSION
spec.authors = ["Sinaru Gunawardena"]
spec.email = ["sinarug@gmail.com"]

Expand All @@ -25,7 +25,7 @@ Gem::Specification.new do |spec|
spec.files = IO.popen(%w[git ls-files -z], chdir: __dir__, err: IO::NULL) do |ls|
ls.readlines("\x0", chomp: true).reject do |f|
(f == gemspec) ||
f.start_with?(*%w[bin/ Gemfile .gitignore .standard.yml])
f.start_with?(*%w[bin/ .github/ Gemfile .gitignore .standard.yml])
end
end
spec.bindir = "bin"
Expand Down
Loading