diff --git a/bindings/elixir/lib/fluss/config.ex b/bindings/elixir/lib/fluss/config.ex index d02a428f..07324df0 100644 --- a/bindings/elixir/lib/fluss/config.ex +++ b/bindings/elixir/lib/fluss/config.ex @@ -33,6 +33,14 @@ defmodule Fluss.Config do @enforce_keys [:bootstrap_servers] defstruct bootstrap_servers: nil, + remote_file_download_thread_num: nil, + scanner_log_fetch_max_bytes: nil, + scanner_log_fetch_max_bytes_for_bucket: nil, + scanner_log_fetch_min_bytes: nil, + scanner_log_fetch_wait_max_time_ms: nil, + scanner_log_max_poll_records: nil, + scanner_remote_log_prefetch_num: nil, + scanner_remote_log_read_concurrency: nil, writer_acks: nil, writer_batch_size: nil, writer_batch_timeout_ms: nil, @@ -48,6 +56,14 @@ defmodule Fluss.Config do @type t :: %__MODULE__{ bootstrap_servers: String.t(), + remote_file_download_thread_num: non_neg_integer() | nil, + scanner_log_fetch_max_bytes: non_neg_integer() | nil, + scanner_log_fetch_max_bytes_for_bucket: non_neg_integer() | nil, + scanner_log_fetch_min_bytes: non_neg_integer() | nil, + scanner_log_fetch_wait_max_time_ms: non_neg_integer() | nil, + scanner_log_max_poll_records: non_neg_integer() | nil, + scanner_remote_log_prefetch_num: non_neg_integer() | nil, + scanner_remote_log_read_concurrency: non_neg_integer() | nil, writer_acks: String.t() | nil, writer_batch_size: non_neg_integer() | nil, writer_batch_timeout_ms: non_neg_integer() | nil, @@ -74,6 +90,44 @@ defmodule Fluss.Config do def set_bootstrap_servers(%__MODULE__{} = config, servers) when is_binary(servers), do: %{config | bootstrap_servers: servers} + @spec set_remote_file_download_thread_num(t(), non_neg_integer()) :: t() + def set_remote_file_download_thread_num(%__MODULE__{} = config, threads) + when is_integer(threads), + do: %{config | remote_file_download_thread_num: threads} + + @spec set_scanner_log_fetch_max_bytes(t(), non_neg_integer()) :: t() + def set_scanner_log_fetch_max_bytes(%__MODULE__{} = config, max_bytes) + when is_integer(max_bytes), + do: %{config | scanner_log_fetch_max_bytes: max_bytes} + + @spec set_scanner_log_fetch_max_bytes_for_bucket(t(), non_neg_integer()) :: t() + def set_scanner_log_fetch_max_bytes_for_bucket(%__MODULE__{} = config, max_bytes) + when is_integer(max_bytes), + do: %{config | scanner_log_fetch_max_bytes_for_bucket: max_bytes} + + @spec set_scanner_log_fetch_min_bytes(t(), non_neg_integer()) :: t() + def set_scanner_log_fetch_min_bytes(%__MODULE__{} = config, min_bytes) + when is_integer(min_bytes), + do: %{config | scanner_log_fetch_min_bytes: min_bytes} + + @spec set_scanner_log_fetch_wait_max_time_ms(t(), non_neg_integer()) :: t() + def set_scanner_log_fetch_wait_max_time_ms(%__MODULE__{} = config, wait_ms) + when is_integer(wait_ms), + do: %{config | scanner_log_fetch_wait_max_time_ms: wait_ms} + + @spec set_scanner_log_max_poll_records(t(), non_neg_integer()) :: t() + def set_scanner_log_max_poll_records(%__MODULE__{} = config, num) when is_integer(num), + do: %{config | scanner_log_max_poll_records: num} + + @spec set_scanner_remote_log_prefetch_num(t(), non_neg_integer()) :: t() + def set_scanner_remote_log_prefetch_num(%__MODULE__{} = config, num) when is_integer(num), + do: %{config | scanner_remote_log_prefetch_num: num} + + @spec set_scanner_remote_log_read_concurrency(t(), non_neg_integer()) :: t() + def set_scanner_remote_log_read_concurrency(%__MODULE__{} = config, concurrency) + when is_integer(concurrency), + do: %{config | scanner_remote_log_read_concurrency: concurrency} + @spec set_writer_acks(t(), String.t()) :: t() def set_writer_acks(%__MODULE__{} = config, acks) when is_binary(acks), do: %{config | writer_acks: acks} diff --git a/bindings/elixir/native/fluss_nif/src/config.rs b/bindings/elixir/native/fluss_nif/src/config.rs index 60034d9b..79aabcd7 100644 --- a/bindings/elixir/native/fluss_nif/src/config.rs +++ b/bindings/elixir/native/fluss_nif/src/config.rs @@ -31,6 +31,14 @@ pub enum NifNoKeyAssigner { #[module = "Fluss.Config"] pub struct NifConfig { pub bootstrap_servers: String, + pub remote_file_download_thread_num: Option, + pub scanner_log_fetch_max_bytes: Option, + pub scanner_log_fetch_max_bytes_for_bucket: Option, + pub scanner_log_fetch_min_bytes: Option, + pub scanner_log_fetch_wait_max_time_ms: Option, + pub scanner_log_max_poll_records: Option, + pub scanner_remote_log_prefetch_num: Option, + pub scanner_remote_log_read_concurrency: Option, pub writer_acks: Option, pub writer_batch_size: Option, pub writer_batch_timeout_ms: Option, @@ -51,6 +59,30 @@ impl NifConfig { bootstrap_servers: self.bootstrap_servers, ..Config::default() }; + if let Some(n) = self.remote_file_download_thread_num { + config.remote_file_download_thread_num = n as usize; + } + if let Some(size) = self.scanner_log_fetch_max_bytes { + config.scanner_log_fetch_max_bytes = size; + } + if let Some(size) = self.scanner_log_fetch_max_bytes_for_bucket { + config.scanner_log_fetch_max_bytes_for_bucket = size; + } + if let Some(size) = self.scanner_log_fetch_min_bytes { + config.scanner_log_fetch_min_bytes = size; + } + if let Some(ms) = self.scanner_log_fetch_wait_max_time_ms { + config.scanner_log_fetch_wait_max_time_ms = ms; + } + if let Some(n) = self.scanner_log_max_poll_records { + config.scanner_log_max_poll_records = n as usize; + } + if let Some(n) = self.scanner_remote_log_prefetch_num { + config.scanner_remote_log_prefetch_num = n as usize; + } + if let Some(n) = self.scanner_remote_log_read_concurrency { + config.scanner_remote_log_read_concurrency = n as usize; + } if let Some(size) = self.writer_batch_size { config.writer_batch_size = size; } diff --git a/bindings/elixir/test/config_test.exs b/bindings/elixir/test/config_test.exs index 344c6478..2550a7e8 100644 --- a/bindings/elixir/test/config_test.exs +++ b/bindings/elixir/test/config_test.exs @@ -23,6 +23,70 @@ defmodule Fluss.ConfigTest do assert config == %Fluss.Config{bootstrap_servers: "localhost:9123"} end + test "set_remote_file_download_thread_num/2 sets the download thread num" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_remote_file_download_thread_num(4) + + assert config.remote_file_download_thread_num == 4 + end + + test "set_scanner_log_fetch_max_bytes/2 sets the fetch max bytes" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_scanner_log_fetch_max_bytes(16_777_216) + + assert config.scanner_log_fetch_max_bytes == 16_777_216 + end + + test "set_scanner_log_fetch_max_bytes_for_bucket/2 sets the per-bucket fetch limit" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_scanner_log_fetch_max_bytes_for_bucket(1_048_576) + + assert config.scanner_log_fetch_max_bytes_for_bucket == 1_048_576 + end + + test "set_scanner_log_fetch_min_bytes/2 sets the fetch min bytes" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_scanner_log_fetch_min_bytes(1) + + assert config.scanner_log_fetch_min_bytes == 1 + end + + test "set_scanner_log_fetch_wait_max_time_ms/2 sets the max wait time" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_scanner_log_fetch_wait_max_time_ms(500) + + assert config.scanner_log_fetch_wait_max_time_ms == 500 + end + + test "set_scanner_log_max_poll_records/2 sets the max poll records" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_scanner_log_max_poll_records(1000) + + assert config.scanner_log_max_poll_records == 1000 + end + + test "set_scanner_remote_log_prefetch_num/2 sets the prefetch num" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_scanner_remote_log_prefetch_num(2) + + assert config.scanner_remote_log_prefetch_num == 2 + end + + test "set_scanner_remote_log_read_concurrency/2 sets the read concurrency" do + config = + Fluss.Config.new("localhost:9123") + |> Fluss.Config.set_scanner_remote_log_read_concurrency(4) + + assert config.scanner_remote_log_read_concurrency == 4 + end + test "set_writer_acks/2 sets the acks value" do config = Fluss.Config.new("localhost:9123")