diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e28fdb..df57a16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 2.0.0 - 2025-06-22 + +- Add named pool support. + ## 1.0.1 - 2025-06-22 - Remove extraneous dependency on [Lifeguard](https://github.com/Pevensie/lifeguard) diff --git a/README.md b/README.md index 914aee4..7ce9d41 100644 --- a/README.md +++ b/README.md @@ -27,21 +27,17 @@ import gleam/otp/static_supervisor as supervisor import valkyrie pub fn main() { - // Create a subject to receive the connection once the supervision tree has been - // started. Use a named subject to make sure we can always receive the connection, - // even if our original process crashes. - let conn_receiver_name = process.new_name("valkyrie_conn_receiver") - let assert Ok(_) = process.register(process.self(), conn_receiver_name) - - let conn_receiver = process.named_subject(conn_receiver_name) + // Create a name to interact with the connection pool once it's started under the + // static supervisor. + let pool_name = process.new_name("connection_pool") // Define a pool of 10 connections to the default Redis instance on localhost. let valkyrie_child_spec = valkyrie.default_config() |> valkyrie.supervised_pool( - receiver: conn_receiver, size: 10, - timeout: 1000 + name: option.Some(pool_name), + timeout: 1000, ) // Start the pool under a supervisor @@ -50,8 +46,8 @@ pub fn main() { |> supervisor.add(valkyrie_child_spec) |> supervisor.start - // Receive the connection now that the pool is started - let assert Ok(conn) = process.receive(conn_receiver, 1000) + // Get the connection now that the pool is started + let conn = valkyrie.named_connection(pool_name) // Use the connection. let assert Ok(_) = echo valkyrie.set(conn, "key", "value", option.None, 1000) diff --git a/dev/valkyrie/example.gleam b/dev/valkyrie/example.gleam index 85f71f0..086d062 100644 --- a/dev/valkyrie/example.gleam +++ b/dev/valkyrie/example.gleam @@ -4,20 +4,16 @@ import gleam/otp/static_supervisor as supervisor import valkyrie pub fn main() { - // Create a subject to receive the connection once the supervision tree has been - // started. Use a named subject to make sure we can always receive the connection, - // even if our original process crashes. - let conn_receiver_name = process.new_name("valkyrie_conn_receiver") - let assert Ok(_) = process.register(process.self(), conn_receiver_name) - - let conn_receiver = process.named_subject(conn_receiver_name) + // Create a name to interact with the connection pool once it's started under the + // static supervisor. + let pool_name = process.new_name("connection_pool") // Define a pool of 10 connections to the default Redis instance on localhost. let valkyrie_child_spec = valkyrie.default_config() |> valkyrie.supervised_pool( - receiver: conn_receiver, size: 10, + name: option.Some(pool_name), timeout: 1000, ) @@ -27,8 +23,8 @@ pub fn main() { |> supervisor.add(valkyrie_child_spec) |> supervisor.start - // Receive the connection now that the pool is started - let assert Ok(conn) = process.receive(conn_receiver, 1000) + // Get the connection now that the pool is started + let conn = valkyrie.named_connection(pool_name) // Use the connection. let assert Ok(_) = echo valkyrie.set(conn, "key", "value", option.None, 1000) diff --git a/gleam.toml b/gleam.toml index 11c4a71..9ac61fb 100644 --- a/gleam.toml +++ b/gleam.toml @@ -1,5 +1,5 @@ name = "valkyrie" -version = "1.0.1" +version = "2.0.0" description = "A Gleam client for Valkey, KeyDB, Redis, Dragonfly and other Redis-compatible databases." gleam = ">= 1.11.0" @@ -17,7 +17,7 @@ gleam_erlang = ">= 1.0.0 and < 2.0.0" gleam_otp = ">= 1.0.0 and < 2.0.0" gleam_stdlib = ">= 0.60.0 and < 2.0.0" mug = ">= 2.0.0 and < 3.0.0" -bath = ">= 4.1.0 and < 5.0.0" +bath = ">= 5.0.0 and < 6.0.0" gleam_time = ">= 1.2.0 and < 2.0.0" [dev-dependencies] diff --git a/manifest.toml b/manifest.toml index 2d65323..802a475 100644 --- a/manifest.toml +++ b/manifest.toml @@ -2,7 +2,7 @@ # You typically do not need to edit this file packages = [ - { name = "bath", version = "4.1.0", build_tools = ["gleam"], requirements = ["gleam_deque", "gleam_erlang", "gleam_otp", "gleam_stdlib", "logging"], otp_app = "bath", source = "hex", outer_checksum = "96C6CC44FBFF8AA31C9225E863E12FB406C0CD71DC3DFAB657BD7899F423866F" }, + { name = "bath", version = "5.0.0", build_tools = ["gleam"], requirements = ["gleam_deque", "gleam_erlang", "gleam_otp", "gleam_stdlib", "logging"], otp_app = "bath", source = "hex", outer_checksum = "BB7A25E5177BC80E4106BC691E026F756A606E9752350F938F2284DB81D8A2C9" }, { name = "envoy", version = "1.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "envoy", source = "hex", outer_checksum = "95FD059345AA982E89A0B6E2A3BF1CF43E17A7048DCD85B5B65D3B9E4E39D359" }, { name = "gleam_deque", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_deque", source = "hex", outer_checksum = "64D77068931338CF0D0CB5D37522C3E3CCA7CB7D6C5BACB41648B519CC0133C7" }, { name = "gleam_erlang", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "7E6A5234F927C4B24F8054AB1E4572206C41F9E6D5C6C02273CB7531E7E5CED0" }, @@ -15,7 +15,7 @@ packages = [ ] [requirements] -bath = { version = ">= 4.1.0 and < 5.0.0" } +bath = { version = ">= 5.0.0 and < 6.0.0" } envoy = { version = ">= 1.0.2 and < 2.0.0" } gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" } gleam_otp = { version = ">= 1.0.0 and < 2.0.0" } diff --git a/src/valkyrie.gleam b/src/valkyrie.gleam index 64ea744..392c808 100644 --- a/src/valkyrie.gleam +++ b/src/valkyrie.gleam @@ -19,11 +19,6 @@ import valkyrie/resp const protocol_version = 3 -/// The configuration for connecting to a Redis-compatible database. -pub type Config { - Config(host: String, port: Int, auth: Auth) -} - pub type Auth { NoAuth PasswordOnly(String) @@ -32,7 +27,7 @@ pub type Auth { pub opaque type Connection { Single(socket: mug.Socket) - Pooled(bath.Pool(mug.Socket)) + Pooled(process.Subject(bath.Msg(mug.Socket))) } pub type PoolError { @@ -83,6 +78,11 @@ fn auth_to_options_list(auth: Auth) -> List(String) { } } +/// The configuration for connecting to a Redis-compatible database. +pub type Config { + Config(host: String, port: Int, auth: Auth) +} + /// Create a default Redis configuration. /// /// Returns a configuration with: @@ -231,16 +231,23 @@ fn create_pool_builder( config: Config, pool_size: Int, init_timeout: Int, + pool_name: option.Option(process.Name(bath.Msg(mug.Socket))), ) -> bath.Builder(mug.Socket) { - bath.new(fn() { - create_socket(config, init_timeout) - |> result.map_error(error_to_string) - }) - |> bath.size(pool_size) - |> bath.on_shutdown(fn(socket) { - mug.shutdown(socket) - |> result.unwrap(Nil) - }) + let pool_builder = + bath.new(fn() { + create_socket(config, init_timeout) + |> result.map_error(error_to_string) + }) + |> bath.size(pool_size) + |> bath.on_shutdown(fn(socket) { + mug.shutdown(socket) + |> result.unwrap(Nil) + }) + + case pool_name { + option.Some(name) -> bath.name(pool_builder, name) + option.None -> pool_builder + } } /// Start a connection pool for Redis connections. @@ -256,10 +263,11 @@ fn create_pool_builder( pub fn start_pool( config config: Config, size pool_size: Int, + name pool_name: option.Option(process.Name(bath.Msg(mug.Socket))), timeout init_timeout: Int, ) -> Result(Connection, StartError) { use pool <- result.try( - create_pool_builder(config, pool_size, init_timeout) + create_pool_builder(config, pool_size, init_timeout, pool_name) |> bath.start(init_timeout) |> result.map(Pooled) |> result.map_error(ActorStartError), @@ -276,6 +284,9 @@ pub fn start_pool( /// supervision tree. The pool will be automatically restarted if it crashes, /// making this the recommended approach for production applications. /// +/// If you wish to use the supervised pool under a static supervisor, you _must_ +/// provide a name, or you'll have no +/// /// Connections are created lazily when requested from the pool. On creation, /// connections will call `HELLO 3` with any authentication information to authenticate /// with the Redis-compatible server. No additional commands will be sent. @@ -292,21 +303,17 @@ pub fn start_pool( /// import valkyrie /// /// pub fn main() { -/// // Create a subject to receive the connection once the supervision tree has been -/// // started. Use a named subject to make sure we can always receive the connection, -/// // even if our original process crashes. -/// let conn_receiver_name = process.new_name("valkyrie_conn_receiver") -/// let assert Ok(_) = process.register(process.self(), conn_receiver_name) -/// -/// let conn_receiver = process.named_subject(conn_receiver_name) +/// // Create a name to interact with the connection pool once it's started under the +/// // static supervisor. +/// let pool_name = process.new_name("connection_pool") /// /// // Define a pool of 10 connections to the default Redis instance on localhost. /// let valkyrie_child_spec = /// valkyrie.default_config() /// |> valkyrie.supervised_pool( -/// receiver: conn_receiver, /// size: 10, -/// timeout: 1000 +/// name: option.Some(pool_name), +/// timeout: 1000, /// ) /// /// // Start the pool under a supervisor @@ -315,24 +322,29 @@ pub fn start_pool( /// |> supervisor.add(valkyrie_child_spec) /// |> supervisor.start /// -/// // Receive the connection now that the pool is started -/// let assert Ok(conn) = process.receive(conn_receiver, 1000) +/// // Get the connection now that the pool is started +/// let conn = valkyrie.named_connection(pool_name) /// /// // Use the connection. -/// let assert Ok(_) = valkyrie.set(conn, "key", "value", option.None, 1000) -/// let assert Ok(_) = valkyrie.get(conn, "key", 1000) +/// let assert Ok(_) = echo valkyrie.set(conn, "key", "value", option.None, 1000) +/// let assert Ok(_) = echo valkyrie.get(conn, "key", 1000) /// /// // Do more stuff... /// } /// ``` pub fn supervised_pool( config config: Config, - receiver subj: process.Subject(Connection), size pool_size: Int, + name pool_name: option.Option(process.Name(bath.Msg(mug.Socket))), timeout init_timeout: Int, -) -> supervision.ChildSpecification(process.Subject(bath.Msg(mug.Socket))) { - create_pool_builder(config, pool_size, init_timeout) - |> bath.supervised_map(subj, Pooled, init_timeout) +) -> supervision.ChildSpecification(Connection) { + create_pool_builder(config, pool_size, init_timeout, pool_name) + |> bath.supervised_map(Pooled, init_timeout) +} + +/// Create a connection from the process name given to the connection pool. +pub fn named_connection(name: process.Name(bath.Msg(mug.Socket))) -> Connection { + Pooled(process.named_subject(name)) } /// Shut down a Redis connection or connection pool. diff --git a/test/valkyrie_test.gleam b/test/valkyrie_test.gleam index 9b6ce1f..e1a96aa 100644 --- a/test/valkyrie_test.gleam +++ b/test/valkyrie_test.gleam @@ -21,7 +21,7 @@ pub fn main() { fn get_test_conn(next: fn(valkyrie.Connection) -> a) -> a { let assert Ok(conn) = valkyrie.default_config() - |> valkyrie.start_pool(3, 1000) + |> valkyrie.start_pool(3, option.None, 1000) let res = next(conn) let assert Ok(_) = valkyrie.custom(conn, ["FLUSHDB"], 1000) @@ -30,17 +30,17 @@ fn get_test_conn(next: fn(valkyrie.Connection) -> a) -> a { } fn get_supervised_conn(next: fn(valkyrie.Connection) -> a) -> a { - let connection_subject = process.new_subject() + let connection_name = process.new_name("valkyrie_test_pool") let child_spec = valkyrie.default_config() - |> valkyrie.supervised_pool(connection_subject, 3, 1000) + |> valkyrie.supervised_pool(3, option.Some(connection_name), 1000) let assert Ok(_started) = static_supervisor.new(static_supervisor.OneForOne) |> static_supervisor.add(child_spec) |> static_supervisor.start - let assert Ok(conn) = process.receive(connection_subject, 1000) + let conn = valkyrie.named_connection(connection_name) let res = next(conn) let assert Ok(_) = valkyrie.custom(conn, ["FLUSHDB"], 1000) @@ -1847,7 +1847,7 @@ pub fn pool_error_scenarios_test() { ) // This should fail to create connections but not panic - let pool_result = invalid_config |> valkyrie.start_pool(1, 100) + let pool_result = invalid_config |> valkyrie.start_pool(1, option.None, 100) case pool_result { Error(_) -> Nil Ok(conn) -> {