Skip to content

MasterOfBinary/gobatch

Repository files navigation

GoBatch

Go codecov PkgGoDev License: MIT

Contents

How GoBatch Works

GoBatch is a flexible and efficient batch processing library for Go, designed to streamline the processing of large volumes of data. It provides a framework for batch processing while allowing users to define their own data sources and processing logic.

NOTE: GoBatch is considered a version 0 release and is in an unstable state. Compatibility may be broken at any time on the master branch. If you need a stable release, wait for version 1.

Latest Release - v0.5.0

Version 0.5.0 introduces a hard switch to generic APIs across the library:

  • BREAKING: Batch, Item, Source, and Processor are now generic (Batch[T], Item[T], Source[T], Processor[T]).
  • BREAKING: Source.Read now returns typed item channels: Read(ctx) (<-chan T, <-chan error).
  • BREAKING: Processor.Process now uses typed items: Process(ctx, []*Item[T]) ([]*Item[T], error).
  • Built-in sources and processors now require type parameters (for example: source.Channel[int], processor.Transform[string]).

See the CHANGELOG.md for complete details.

Core Components

  1. Source: An interface implemented by the user to define where data comes from (e.g. a channel, database, API, or file system).
  2. Processor: An interface implemented by the user to define how batches of data should be processed. Multiple processors can be chained together to create a processing pipeline.
  3. Batch: The central structure provided by GoBatch that manages the batch processing pipeline.

The Batch Processing Pipeline

  1. Data Reading:

    • The Source implementation reads data from its origin and returns two channels: data and errors.
    • Data items are sent to the Batch structure via these channels.
  2. Batching:

    • The Batch structure queues incoming items.
    • It determines when to form a batch based on configured criteria (time elapsed, number of items, etc.).
  3. Processing:

    • When a batch is ready, Batch sends it to the Processor implementation(s).
    • Each processor in the chain performs operations on the batch and passes the results to the next processor.
    • Individual item errors are tracked within the Item struct.
  4. Result Handling:

    • Processed results and any errors are managed by the Batch structure.
    • Errors can come from the Source, Processor, or individual items.

Typical Use Cases

GoBatch can be applied to a lot of scenarios where processing items in batches is beneficial. Some potential use-cases include:

  • Database Operations: Optimize inserts, updates, or reads by batching operations.
  • Log Processing: Efficiently process log entries in batches for analysis or storage.
  • File Processing: Process large files in manageable chunks for better performance.
  • Cache Updates: Reduce network overhead by batching cache updates.
  • Message Queue Consumption: Process messages from queues in batches.
  • Bulk Data Validation: Validate large datasets in parallel batches for faster results.

By batching operations, you can reduce network overhead, optimize resource utilization, and improve overall system performance.

Installation

To download, run:

go get github.com/MasterOfBinary/gobatch

Requirements

  • Go 1.18 or later is required.

Key Components

  • Batch[T]: The main struct that manages batch processing for item type T.
  • Source[T]: Provides data by implementing Read(ctx) (<-chan T, <-chan error).
  • Processor[T]: Processes batches by implementing Process(ctx, []*Item[T]) ([]*Item[T], error).
  • Config: Provides dynamic configuration values.
  • Item[T]: Represents a single typed data item with a unique ID and an optional error.

Built-in Processors

  • Filter: Filters items based on a predicate function.
  • Transform: Transforms item data with a custom function.
  • Error: Simulates processor errors for testing.
  • Nil: Passes items through unchanged for benchmarking.
  • Channel: Writes item data to an output channel.

Built-in Sources

  • Channel: Uses Go channels as sources.
  • Error: Simulates error-only sources for testing.
  • Nil: Emits no data for timing tests.

Helper Functions

  • IgnoreErrors: Drains the error channel in the background, allowing you to call Done() without handling errors immediately.
  • CollectErrors: Collects all errors into a slice after batch processing finishes.
  • RunBatchAndWait: Starts a batch, waits for completion, and returns all collected errors.
  • ExecuteBatches: Runs multiple batches concurrently and collects all errors into a single slice.

Basic Usage

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/MasterOfBinary/gobatch/batch"
	"github.com/MasterOfBinary/gobatch/processor"
	"github.com/MasterOfBinary/gobatch/source"
)

func main() {
	// Create a batch processor with simple config
	b := batch.New[int](batch.NewConstantConfig(&batch.ConfigValues{
		MinItems: 2,
		MaxItems: 5,
		MinTime:  10 * time.Millisecond,
		MaxTime:  100 * time.Millisecond,
	}))

	// Create an input channel
	ch := make(chan int)

	// Wrap it with a source.Channel
	src := &source.Channel[int]{Input: ch}

	// First processor: double each number
	doubleProc := &processor.Transform[int]{
		Func: func(data int) (int, error) {
			return data * 2, nil
		},
	}

	// Second processor: print each processed number
	printProc := &processor.Transform[int]{
		Func: func(data int) (int, error) {
			fmt.Println(data)
			return data, nil
		},
	}

	ctx := context.Background()

	// Start batch processing with processors chained
	errs, err := b.Go(ctx, src, doubleProc, printProc)
	if err != nil {
		log.Fatal(err)
	}

	// Ignore errors for this simple example
	batch.IgnoreErrors(errs)

	// Send some items to the input channel
	go func() {
		for i := 1; i <= 5; i++ {
			ch <- i
		}
		close(ch)
	}()

	// Wait for processing to complete
	<-b.Done()
}

Expected output:

2
4
6
8
10

Configuration

GoBatch supports flexible configuration through the Config interface, which defines how batches are formed based on size and timing rules.

You can choose between:

  • ConstantConfig for static, unchanging settings.
  • DynamicConfig for runtime-adjustable settings that can be updated while processing.

Passing a nil Config to New (or using the zero-value &Batch[T]{}) uses a default configuration, where items are processed immediately as they are read.

Configuration options include:

  • MinItems: Minimum number of items to process in a batch.
  • MaxItems: Maximum number of items to process in a batch.
  • MinTime: Minimum time to wait before processing a batch.
  • MaxTime: Maximum time to wait before processing a batch.

The configuration is automatically adjusted to keep it consistent:

  • If MinItems > MaxItems, MinItems will be set to MaxItems.
  • If MinTime > MaxTime, MinTime will be set to MaxTime.
  • If MinItems is 0, it will be set to 1.

Example: Constant Configuration

config := batch.NewConstantConfig(&batch.ConfigValues{
    MinItems: 10,
    MaxItems: 100,
    MinTime:  50 * time.Millisecond,
    MaxTime:  500 * time.Millisecond,
})

batchProcessor := batch.New[int](config)

Example: Dynamic Configuration

DynamicConfig allows you to adjust batch parameters at runtime, for example, based on system load.

// Create dynamic configuration
dynConfig := batch.NewDynamicConfig(&batch.ConfigValues{
    MinItems: 10,
    MaxItems: 100,
    MinTime:  50 * time.Millisecond,
    MaxTime:  500 * time.Millisecond,
})

batchProcessor := batch.New[int](dynConfig)

// ... start processing ...

// Later, update the configuration based on new requirements
dynConfig.UpdateBatchSize(20, 200)
dynConfig.UpdateTiming(100 * time.Millisecond, 1 * time.Second)

Buffer Configuration

You can fine-tune the performance by customizing the internal channel buffer sizes using WithBufferConfig. This is useful for high-throughput scenarios or when dealing with bursty traffic.

// Configure custom buffer sizes
batchProcessor := batch.New[int](config).WithBufferConfig(batch.BufferConfig{
    ItemBufferSize:  1000, // Buffer for incoming items
    ErrorBufferSize: 500,  // Buffer for error reporting
})

Error Handling

Errors can come from three sources:

  1. Source errors: Errors returned from Source.Read().
  2. Processor errors: Errors returned from Processor.Process().
  3. Item-specific errors: Errors set on individual Item.Error fields.

All errors are reported through the error channel returned by the Go method.

Example error handling:

import (
	"errors"
	"github.com/MasterOfBinary/gobatch/batch"
)

go func() {
	for err := range errs {
		var srcErr *batch.SourceError
		var procErr *batch.ProcessorError
		switch {
		case errors.As(err, &srcErr):
			log.Printf("Source error: %v", srcErr.Unwrap())
		case errors.As(err, &procErr):
			log.Printf("Processor error: %v", procErr.Unwrap())
		default:
			log.Printf("Error: %v", err)
		}
	}
}()

Or using helper functions:

// Collect all errors (blocks until processing completes)
pipeErrs, err := batchProcessor.Go(ctx, source, processor)
if err != nil {
    // Handle start error (e.g. batch.ErrNilSource, batch.ErrBatchUsed)
    log.Fatal(err)
}
errs := batch.CollectErrors(pipeErrs)

// Or use the RunBatchAndWait helper, which folds a start error into the slice
errs := batch.RunBatchAndWait(ctx, batchProcessor, source, processor)

for _, err := range errs {
    // Handle error
}

Batch lifecycle

A Batch is single-use: call Go exactly once per Batch. Calling Go again returns batch.ErrBatchUsed (along with a closed, drainable error channel) instead of starting a second run — create a fresh Batch with New for each run. Go also returns batch.ErrNilSource when the source is nil. Both errors are checkable with errors.Is.

Documentation

See the pkg.go.dev docs for documentation and examples.

Testing

Run tests with:

go test github.com/MasterOfBinary/gobatch/...

Future Roadmap

  • Sync-like Batching: Support for request-reply patterns where batched operations appear synchronous to the caller.

Contributing

Contributions are welcome! Feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributors

Languages