Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions bindings/elixir/lib/fluss/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ defmodule Fluss.Config do

@enforce_keys [:bootstrap_servers]
defstruct bootstrap_servers: nil,
remote_file_download_thread_num: nil,
scanner_log_fetch_max_bytes: nil,
scanner_log_fetch_max_bytes_for_bucket: nil,
scanner_log_fetch_min_bytes: nil,
scanner_log_fetch_wait_max_time_ms: nil,
scanner_log_max_poll_records: nil,
scanner_remote_log_prefetch_num: nil,
scanner_remote_log_read_concurrency: nil,
writer_acks: nil,
writer_batch_size: nil,
writer_batch_timeout_ms: nil,
Expand All @@ -48,6 +56,14 @@ defmodule Fluss.Config do

@type t :: %__MODULE__{
bootstrap_servers: String.t(),
remote_file_download_thread_num: non_neg_integer() | nil,
scanner_log_fetch_max_bytes: non_neg_integer() | nil,
scanner_log_fetch_max_bytes_for_bucket: non_neg_integer() | nil,
scanner_log_fetch_min_bytes: non_neg_integer() | nil,
scanner_log_fetch_wait_max_time_ms: non_neg_integer() | nil,
scanner_log_max_poll_records: non_neg_integer() | nil,
scanner_remote_log_prefetch_num: non_neg_integer() | nil,
scanner_remote_log_read_concurrency: non_neg_integer() | nil,
writer_acks: String.t() | nil,
writer_batch_size: non_neg_integer() | nil,
writer_batch_timeout_ms: non_neg_integer() | nil,
Expand All @@ -74,6 +90,44 @@ defmodule Fluss.Config do
def set_bootstrap_servers(%__MODULE__{} = config, servers) when is_binary(servers),
do: %{config | bootstrap_servers: servers}

@spec set_remote_file_download_thread_num(t(), non_neg_integer()) :: t()
def set_remote_file_download_thread_num(%__MODULE__{} = config, threads)
when is_integer(threads),
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do you mind to tighten the guards to match type specs we use?
here and in other setters through config

do: %{config | remote_file_download_thread_num: threads}

@spec set_scanner_log_fetch_max_bytes(t(), non_neg_integer()) :: t()
def set_scanner_log_fetch_max_bytes(%__MODULE__{} = config, max_bytes)
when is_integer(max_bytes),
do: %{config | scanner_log_fetch_max_bytes: max_bytes}

@spec set_scanner_log_fetch_max_bytes_for_bucket(t(), non_neg_integer()) :: t()
def set_scanner_log_fetch_max_bytes_for_bucket(%__MODULE__{} = config, max_bytes)
when is_integer(max_bytes),
do: %{config | scanner_log_fetch_max_bytes_for_bucket: max_bytes}

@spec set_scanner_log_fetch_min_bytes(t(), non_neg_integer()) :: t()
def set_scanner_log_fetch_min_bytes(%__MODULE__{} = config, min_bytes)
when is_integer(min_bytes),
do: %{config | scanner_log_fetch_min_bytes: min_bytes}

@spec set_scanner_log_fetch_wait_max_time_ms(t(), non_neg_integer()) :: t()
def set_scanner_log_fetch_wait_max_time_ms(%__MODULE__{} = config, wait_ms)
when is_integer(wait_ms),
do: %{config | scanner_log_fetch_wait_max_time_ms: wait_ms}

@spec set_scanner_log_max_poll_records(t(), non_neg_integer()) :: t()
def set_scanner_log_max_poll_records(%__MODULE__{} = config, num) when is_integer(num),
do: %{config | scanner_log_max_poll_records: num}

@spec set_scanner_remote_log_prefetch_num(t(), non_neg_integer()) :: t()
def set_scanner_remote_log_prefetch_num(%__MODULE__{} = config, num) when is_integer(num),
do: %{config | scanner_remote_log_prefetch_num: num}

@spec set_scanner_remote_log_read_concurrency(t(), non_neg_integer()) :: t()
def set_scanner_remote_log_read_concurrency(%__MODULE__{} = config, concurrency)
when is_integer(concurrency),
do: %{config | scanner_remote_log_read_concurrency: concurrency}

@spec set_writer_acks(t(), String.t()) :: t()
def set_writer_acks(%__MODULE__{} = config, acks) when is_binary(acks),
do: %{config | writer_acks: acks}
Expand Down
32 changes: 32 additions & 0 deletions bindings/elixir/native/fluss_nif/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ pub enum NifNoKeyAssigner {
#[module = "Fluss.Config"]
pub struct NifConfig {
pub bootstrap_servers: String,
pub remote_file_download_thread_num: Option<u64>,
pub scanner_log_fetch_max_bytes: Option<i32>,
pub scanner_log_fetch_max_bytes_for_bucket: Option<i32>,
pub scanner_log_fetch_min_bytes: Option<i32>,
pub scanner_log_fetch_wait_max_time_ms: Option<i32>,
pub scanner_log_max_poll_records: Option<u64>,
pub scanner_remote_log_prefetch_num: Option<u64>,
pub scanner_remote_log_read_concurrency: Option<u64>,
pub writer_acks: Option<String>,
pub writer_batch_size: Option<i32>,
pub writer_batch_timeout_ms: Option<i64>,
Expand All @@ -51,6 +59,30 @@ impl NifConfig {
bootstrap_servers: self.bootstrap_servers,
..Config::default()
};
if let Some(n) = self.remote_file_download_thread_num {
config.remote_file_download_thread_num = n as usize;
}
if let Some(size) = self.scanner_log_fetch_max_bytes {
config.scanner_log_fetch_max_bytes = size;
}
if let Some(size) = self.scanner_log_fetch_max_bytes_for_bucket {
config.scanner_log_fetch_max_bytes_for_bucket = size;
}
if let Some(size) = self.scanner_log_fetch_min_bytes {
config.scanner_log_fetch_min_bytes = size;
}
if let Some(ms) = self.scanner_log_fetch_wait_max_time_ms {
config.scanner_log_fetch_wait_max_time_ms = ms;
}
if let Some(n) = self.scanner_log_max_poll_records {
config.scanner_log_max_poll_records = n as usize;
}
if let Some(n) = self.scanner_remote_log_prefetch_num {
config.scanner_remote_log_prefetch_num = n as usize;
}
if let Some(n) = self.scanner_remote_log_read_concurrency {
config.scanner_remote_log_read_concurrency = n as usize;
}
if let Some(size) = self.writer_batch_size {
config.writer_batch_size = size;
}
Expand Down
64 changes: 64 additions & 0 deletions bindings/elixir/test/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,70 @@ defmodule Fluss.ConfigTest do
assert config == %Fluss.Config{bootstrap_servers: "localhost:9123"}
end

test "set_remote_file_download_thread_num/2 sets the download thread num" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_remote_file_download_thread_num(4)

assert config.remote_file_download_thread_num == 4
end

test "set_scanner_log_fetch_max_bytes/2 sets the fetch max bytes" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_scanner_log_fetch_max_bytes(16_777_216)

assert config.scanner_log_fetch_max_bytes == 16_777_216
end

test "set_scanner_log_fetch_max_bytes_for_bucket/2 sets the per-bucket fetch limit" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_scanner_log_fetch_max_bytes_for_bucket(1_048_576)

assert config.scanner_log_fetch_max_bytes_for_bucket == 1_048_576
end

test "set_scanner_log_fetch_min_bytes/2 sets the fetch min bytes" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_scanner_log_fetch_min_bytes(1)

assert config.scanner_log_fetch_min_bytes == 1
end

test "set_scanner_log_fetch_wait_max_time_ms/2 sets the max wait time" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_scanner_log_fetch_wait_max_time_ms(500)

assert config.scanner_log_fetch_wait_max_time_ms == 500
end

test "set_scanner_log_max_poll_records/2 sets the max poll records" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_scanner_log_max_poll_records(1000)

assert config.scanner_log_max_poll_records == 1000
end

test "set_scanner_remote_log_prefetch_num/2 sets the prefetch num" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_scanner_remote_log_prefetch_num(2)

assert config.scanner_remote_log_prefetch_num == 2
end

test "set_scanner_remote_log_read_concurrency/2 sets the read concurrency" do
config =
Fluss.Config.new("localhost:9123")
|> Fluss.Config.set_scanner_remote_log_read_concurrency(4)

assert config.scanner_remote_log_read_concurrency == 4
end

test "set_writer_acks/2 sets the acks value" do
config =
Fluss.Config.new("localhost:9123")
Expand Down
Loading