diff --git a/CHANGELOG.md b/CHANGELOG.md index 3742d31..e459a04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,30 @@ # Changelog +## Unreleased + +### Added + +- **DuckDB support** via `duckdbex` driver + - `SqlKit.DuckDB.connect/2` and `disconnect/1` for direct connections + - `SqlKit.DuckDB.Pool` - NimblePool-based connection pool with supervision + - File-based SQL support via `:backend` option (`backend: {:duckdb, pool: PoolName}`) + - Automatic hugeint to integer conversion + - PostgreSQL-style `$1, $2, ...` parameter placeholders + +- **Prepared statement caching** for DuckDB pools + - Automatic caching of prepared statements per connection + - Configurable via `:cache` option (default: true) + +- **Streaming support** for DuckDB large result sets + - `SqlKit.DuckDB.stream!/3` and `stream_with_columns!/3` for direct connections + - `SqlKit.DuckDB.Pool.with_stream!/5` and `with_stream_and_columns!/6` for pools + - `with_stream!/3` and `with_stream_and_columns!/4` for file-based SQL modules + +- **Pool tuning options** + - `:timeout` option for checkout operations (default: 5000ms) + - Lazy connection initialization + - Documented pool behavior and configuration + ## 0.1.0 - Initial release diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..ade4802 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,263 @@ +# SqlKit + +An Elixir library for executing raw SQL with automatic result transformation. + +## Overview + +SqlKit provides two ways to execute raw SQL with results automatically transformed into maps or structs: + +1. **Direct SQL execution** - Standalone functions for executing SQL strings with any Ecto repo +2. **File-based SQL** - Macro-based approach for compile-time embedded SQL files + +SQL files are embedded at compile-time for production (stored in module attributes) while reading from disk in dev/test for rapid iteration. Supports Postgrex, MyXQL, Exqlite, Tds, Ch (ClickHouse), and DuckDB. + +## Project Structure + +``` +lib/ + sql_kit.ex # Main module with `use SqlKit` macro + standalone query functions + sql_kit/ + config.ex # Runtime config (root_sql_dir, load_sql) + helpers.ex # Compile-time helpers (file_atom) + exceptions.ex # NoResultsError, MultipleResultsError + query.ex # Core query execution logic (shared by both APIs) + duckdb.ex # DuckDB connection management (conditional on duckdbex) + duckdb/ + pool.ex # NimblePool-based connection pool for DuckDB +test/ + sql_kit_test.exs # Main tests covering all databases and both APIs + sql_kit/ + helpers_test.exs # Helpers module tests + duckdb_test.exs # DuckDB-specific tests + support/ + repos.ex # Test Ecto repos (Postgres, MySQL, SQLite, TDS, ClickHouse) + sql/ # Test SQL files per database + data_case.ex # Test case template + test_setup.ex # Database setup/teardown + test_sql_modules.ex # Test SqlKit modules +``` + +## Key Technical Decisions + +- **Two APIs**: Standalone functions for direct SQL execution + macro-generated functions for file-based SQL +- **Shared execution logic**: Both APIs delegate to `SqlKit.Query` for consistent behavior +- **Compile-time embedding**: SQL files are read once at compile time and stored as module attributes with `persist: true` +- **Runtime file reads in dev/test**: Allows editing SQL without recompilation via `:load_sql` config +- **Direct driver support**: Pattern matches on result structs with `columns` and `rows` fields +- **Atom safety**: Use `String.to_existing_atom/1` for column names (requires struct fields to pre-exist) +- **Configurable**: `otp_app`, `repo`/`backend`, `dirname`, and `root_sql_dir` are configurable +- **Backend abstraction**: File-based SQL supports both Ecto repos (`:repo`) and DuckDB pools (`:backend`) + +## Core API + +### Standalone Functions (SqlKit module) + +```elixir +# Execute SQL strings directly with any Ecto repo +SqlKit.query_all!(MyApp.Repo, "SELECT * FROM users WHERE age > $1", [21]) +# => [%{id: 1, name: "Alice", age: 30}, ...] + +SqlKit.query_one!(MyApp.Repo, "SELECT * FROM users WHERE id = $1", [1]) +# => %{id: 1, name: "Alice"} + +SqlKit.query_all!(MyApp.Repo, "SELECT * FROM users", [], as: User) +# => [%User{id: 1, name: "Alice"}, ...] + +# Non-bang variants +SqlKit.query_all(repo, sql, params, opts) # => {:ok, results} | {:error, reason} +SqlKit.query_one(repo, sql, params, opts) # => {:ok, result | nil} | {:error, reason} + +# Aliases for query_one +SqlKit.query!(repo, sql, params, opts) +SqlKit.query(repo, sql, params, opts) +``` + +### File-Based Functions (generated by `use SqlKit`) + +```elixir +# With Ecto repo +defmodule MyApp.Reports.SQL do + use SqlKit, + otp_app: :my_app, + repo: MyApp.Repo, + dirname: "reports", + files: ["stats.sql", "activity.sql"] +end + +# With DuckDB pool (use :backend instead of :repo) +defmodule MyApp.Analytics.SQL do + use SqlKit, + otp_app: :my_app, + backend: {:duckdb, pool: MyApp.AnalyticsPool}, + dirname: "analytics", + files: ["daily_summary.sql"] +end + +# Usage (same API for both) +MyApp.Reports.SQL.query!("stats.sql", [id]) # single row (alias for query_one!) +MyApp.Reports.SQL.query_one!("stats.sql", [id]) # single row +MyApp.Reports.SQL.query_all!("activity.sql", [id], as: Activity) # all rows as structs +MyApp.Reports.SQL.load!("stats.sql") # just get SQL string + +# Non-bang variants return {:ok, result} | {:error, reason} +MyApp.Reports.SQL.query("stats.sql", [id]) +MyApp.Reports.SQL.query_one("stats.sql", [id]) +MyApp.Reports.SQL.query_all("activity.sql", [id]) +MyApp.Reports.SQL.load("stats.sql") +``` + +### Utility Functions + +```elixir +# Transform raw columns/rows into maps or structs (used internally, also public) +SqlKit.transform_rows(["id", "name"], [[1, "Alice"]], as: User) +# => [%User{id: 1, name: "Alice"}] + +# Extract columns and rows from driver result +SqlKit.extract_result(result) +# => {["id", "name"], [[1, "Alice"]]} +``` + +## Supported Databases + +| Database | Ecto Adapter | Driver | +|------------|---------------------------|----------| +| PostgreSQL | Ecto.Adapters.Postgres | Postgrex | +| MySQL | Ecto.Adapters.MyXQL | MyXQL | +| SQLite | Ecto.Adapters.SQLite3 | Exqlite | +| SQL Server | Ecto.Adapters.Tds | Tds | +| ClickHouse | Ecto.Adapters.ClickHouse | Ch | +| DuckDB | N/A (direct driver) | Duckdbex | + +### DuckDB Support + +DuckDB is unique - it's not an Ecto adapter but a direct NIF driver. SqlKit provides first-class support: + +```elixir +# Direct connection (BYO) +{:ok, conn} = SqlKit.DuckDB.connect(":memory:") +SqlKit.query_all!(conn, "SELECT * FROM users", []) +SqlKit.DuckDB.disconnect(conn) + +# Pooled connection (recommended for production) +# Add to supervision tree: +{SqlKit.DuckDB.Pool, name: MyPool, database: "analytics.duckdb", pool_size: 4} + +# With custom Duckdbex config: +{SqlKit.DuckDB.Pool, name: MyPool, database: "analytics.duckdb", pool_size: 4, + config: %Duckdbex.Config{threads: 4}} + +# Then use the pool: +{:ok, pool} = SqlKit.DuckDB.Pool.start_link(name: MyPool, database: ":memory:") +SqlKit.query_all!(pool, "SELECT * FROM events", []) + +# File-based SQL with DuckDB (use :backend instead of :repo) +defmodule MyApp.Analytics.SQL do + use SqlKit, + otp_app: :my_app, + backend: {:duckdb, pool: MyApp.AnalyticsPool}, + dirname: "analytics", + files: ["daily_summary.sql"] +end + +MyApp.Analytics.SQL.query_all!("daily_summary.sql", [~D[2024-01-01]]) +``` + +Key differences from Ecto-based databases: +- Uses PostgreSQL-style `$1, $2, ...` parameter placeholders +- In-memory database: use `":memory:"` string +- Pool uses NimblePool (connections share one database instance) +- Hugeint values auto-converted to Elixir integers +- Extensions loaded via SQL: `INSTALL 'parquet'; LOAD 'parquet';` +- File-based SQL uses `:backend` option instead of `:repo` + +### DuckDB Pool Features + +**Prepared Statement Caching**: Pool queries automatically cache prepared statements per connection. Repeated queries with the same SQL skip the prepare step. + +```elixir +# Caching is enabled by default +SqlKit.query_all!(pool, "SELECT * FROM events WHERE id = $1", [1]) +SqlKit.query_all!(pool, "SELECT * FROM events WHERE id = $1", [2]) # uses cached statement + +# Disable caching for specific queries +SqlKit.DuckDB.Pool.query!(pool, sql, params, cache: false) +``` + +**Streaming Large Results**: For memory-efficient processing of large result sets: + +```elixir +# Direct connection streaming +conn +|> SqlKit.DuckDB.stream!("SELECT * FROM large_table", []) +|> Stream.flat_map(& &1) +|> Enum.take(100) + +# With column names +{columns, stream} = SqlKit.DuckDB.stream_with_columns!(conn, sql, []) + +# Pool streaming (callback-based to manage connection lifecycle) +SqlKit.DuckDB.Pool.with_stream!(pool, "SELECT * FROM events", [], fn stream -> + stream |> Stream.flat_map(& &1) |> Enum.count() +end) + +# File-based SQL streaming (DuckDB backends only) +MyApp.Analytics.SQL.with_stream!("large_query.sql", [], fn stream -> + stream |> Stream.flat_map(& &1) |> Enum.take(1000) +end) +``` + +**Pool Timeout**: All pool operations accept a `:timeout` option (default: 5000ms): + +```elixir +SqlKit.DuckDB.Pool.query!(pool, sql, params, timeout: 10_000) +SqlKit.DuckDB.Pool.checkout!(pool, fn conn -> ... end, timeout: 10_000) +``` + +## Configuration + +Users configure in their app's config: + +```elixir +# config/config.exs +config :my_app, SqlKit, + root_sql_dir: "priv/repo/sql" # default + +# config/dev.exs and config/test.exs +config :my_app, SqlKit, + load_sql: :dynamic # read from disk at runtime + +# config/prod.exs (or rely on default) +config :my_app, SqlKit, + load_sql: :compiled # use compile-time embedded SQL (default) +``` + +## Commands + +```bash +mix check # Run all checks (format, compile, dialyzer, credo, sobelow, test) +mix test # Run tests (requires all databases running via Docker) +mix format # Format code +mix credo # Linting +mix dialyzer # Type checking +mix sobelow # Security analysis +``` + +## Dependencies + +Runtime: +- ecto_sql ~> 3.0 +- nimble_pool ~> 1.1 +- postgrex ~> 0.19 (optional) +- myxql ~> 0.7 (optional) +- ecto_sqlite3 ~> 0.18 (optional) +- tds ~> 2.3 (optional) +- ecto_ch ~> 0.7 (optional) +- duckdbex ~> 0.3.19 (optional) + +Dev/Test: +- ex_check ~> 0.16 +- credo ~> 1.7 +- dialyxir ~> 1.4 +- sobelow ~> 0.14 +- styler ~> 1.10 diff --git a/README.md b/README.md index d4a78c2..967e1cd 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ For file-based SQL, keeping queries in `.sql` files brings other practical benef - **Two APIs**: Execute SQL strings directly or load from files - **Compile-time embedding**: File-based SQL read once at compile time and stored as module attributes - **Dynamic loading in dev/test**: Edit SQL files without recompiling -- **Multi-database support**: Works with PostgreSQL, MySQL/MariaDB, SQLite, SQL Server, and ClickHouse +- **Multi-database support**: Works with PostgreSQL, MySQL/MariaDB, SQLite, SQL Server, ClickHouse, and DuckDB ## Supported Databases @@ -36,6 +36,7 @@ For file-based SQL, keeping queries in `.sql` files brings other practical benef | MariaDB | Ecto.Adapters.MyXQL | MyXQL | | SQL Server | Ecto.Adapters.Tds | Tds | | ClickHouse | Ecto.Adapters.ClickHouse | Ch | +| DuckDB | N/A (direct driver) | Duckdbex | ## Installation @@ -49,6 +50,17 @@ def deps do end ``` +For DuckDB support, also add `duckdbex`: + +```elixir +def deps do + [ + {:sql_kit, "~> 0.1.0"}, + {:duckdbex, "~> 0.3"} + ] +end +``` + ## Quick Start ### Direct SQL Execution @@ -128,6 +140,133 @@ MyApp.Reports.SQL.load!("stats.sql") # => "SELECT id, name, total_sales..." ``` +## DuckDB + +DuckDB is a high-performance analytical database. Unlike other supported databases, DuckDB is not an Ecto adapter—SqlKit provides direct integration via the `duckdbex` driver. + +### Direct Connection + +For scripts, one-off analysis, or simple use cases: + +```elixir +# In-memory database +{:ok, conn} = SqlKit.DuckDB.connect(":memory:") +SqlKit.query_all!(conn, "SELECT 1 as num", []) +# => [%{num: 1}] +SqlKit.DuckDB.disconnect(conn) + +# File-based database +{:ok, conn} = SqlKit.DuckDB.connect("analytics.duckdb") +SqlKit.query_all!(conn, "SELECT * FROM events", []) +SqlKit.DuckDB.disconnect(conn) + +# With custom configuration +{:ok, conn} = SqlKit.DuckDB.connect("analytics.duckdb", + config: %Duckdbex.Config{threads: 4}) +``` + +### Pooled Connection (Recommended for Production) + +For production use, add the pool to your supervision tree: + +```elixir +# In your application.ex +def start(_type, _args) do + children = [ + # ... other children + {SqlKit.DuckDB.Pool, + name: MyApp.AnalyticsPool, + database: "priv/analytics.duckdb", + pool_size: 4} + ] + + Supervisor.start_link(children, strategy: :one_for_one) +end +``` + +Pool options: +- `:name` - Required. The name to register the pool under +- `:database` - Required. Path to database file or `":memory:"` +- `:pool_size` - Number of connections. Default: 4 +- `:config` - Optional `Duckdbex.Config` struct for advanced configuration (threads, memory limits, etc.) + +Then query using the pool: + +```elixir +pool = SqlKit.DuckDB.Pool.pool(MyApp.AnalyticsPool) +SqlKit.query_all!(pool, "SELECT * FROM events WHERE date > $1", [~D[2024-01-01]]) +# => [%{id: 1, date: ~D[2024-01-15], ...}, ...] +``` + +### File-Based SQL with DuckDB + +Use the `:backend` option instead of `:repo`: + +```elixir +defmodule MyApp.Analytics.SQL do + use SqlKit, + otp_app: :my_app, + backend: {:duckdb, pool: MyApp.AnalyticsPool}, + dirname: "analytics", + files: ["daily_summary.sql", "user_activity.sql"] +end + +# Usage +MyApp.Analytics.SQL.query_all!("daily_summary.sql", [~D[2024-01-01]]) +``` + +### Loading Extensions + +DuckDB extensions (Parquet, JSON, HTTPFS, etc.) are loaded via SQL: + +```elixir +pool = SqlKit.DuckDB.Pool.pool(MyApp.AnalyticsPool) +SqlKit.query!(pool, "INSTALL 'parquet';", []) +SqlKit.query!(pool, "LOAD 'parquet';", []) +SqlKit.query_all!(pool, "SELECT * FROM 'data.parquet'", []) +``` + +### Streaming Large Results + +For memory-efficient processing of large result sets: + +```elixir +# Direct connection streaming +conn +|> SqlKit.DuckDB.stream!("SELECT * FROM large_table", []) +|> Stream.flat_map(& &1) +|> Enum.take(100) + +# Pool streaming (callback-based) +SqlKit.DuckDB.Pool.with_stream!(pool, "SELECT * FROM events", [], fn stream -> + stream |> Stream.flat_map(& &1) |> Enum.count() +end) + +# File-based SQL streaming (DuckDB backends only) +MyApp.Analytics.SQL.with_stream!("large_query.sql", [], fn stream -> + stream |> Stream.flat_map(& &1) |> Enum.take(1000) +end) +``` + +### Pool Options + +Pool operations accept these options: +- `:timeout` - Checkout timeout in milliseconds (default: 5000) +- `:cache` - Enable prepared statement caching (default: true) + +```elixir +SqlKit.DuckDB.Pool.query!(pool, sql, params, timeout: 10_000, cache: false) +``` + +### Key Differences from Ecto-Based Databases + +- Uses PostgreSQL-style `$1, $2, ...` parameter placeholders +- In-memory database: use `":memory:"` string (not `:memory` atom) +- Pool uses NimblePool (connections share one database instance) +- Pool automatically caches prepared statements for repeated queries +- Hugeint values are automatically converted to Elixir integers +- Date/Time values are returned as tuples (e.g., `{2024, 1, 15}` for dates) + ## Configuration ```elixir @@ -155,6 +294,7 @@ Each database uses different parameter placeholder syntax: | SQLite | `?` | `WHERE id = ? AND age > ?` | | SQL Server | `@1`, `@2`, ... | `WHERE id = @1 AND age > @2` | | ClickHouse | `{name:Type}` | `WHERE id = {id:UInt32} AND age > {age:UInt32}` | +| DuckDB | `$1`, `$2`, ... | `WHERE id = $1 AND age > $2` | ### ClickHouse Named Parameters @@ -207,10 +347,13 @@ This pattern gives you named parameters through Elixir function arguments while ## Use SqlKit Options - `:otp_app` (required) - Your application name -- `:repo` (required) - The Ecto repo module to use for queries +- `:repo` - The Ecto repo module to use for queries (required unless `:backend` is specified) +- `:backend` - Alternative to `:repo` for non-Ecto databases. Supports `{:duckdb, pool: PoolName}` - `:dirname` (required) - Subdirectory within root_sql_dir for this module's SQL files - `:files` (required) - List of SQL filenames to load +Note: You must specify either `:repo` or `:backend`, but not both. + ## API Reference ### Standalone Functions @@ -413,7 +556,7 @@ SQL.load("users.sql") mix test ``` -The test suite runs against all supported databases (PostgreSQL, MySQL, SQLite, SQL Server, and ClickHouse). All databases must be running for the full test suite to pass. +The test suite runs against all supported databases (PostgreSQL, MySQL, SQLite, SQL Server, ClickHouse, and DuckDB). All databases must be running for the full test suite to pass. ### Database Ports @@ -422,7 +565,7 @@ The test suite runs against all supported databases (PostgreSQL, MySQL, SQLite, - SQL Server: 1433 - ClickHouse: 8123, 9000 -SQLite uses a local file and doesn't require Docker. +SQLite and DuckDB use local files/memory and don't require Docker. ### Before Pull Request diff --git a/lib/sql_kit.ex b/lib/sql_kit.ex index af77994..37b7ad9 100644 --- a/lib/sql_kit.ex +++ b/lib/sql_kit.ex @@ -29,6 +29,30 @@ defmodule SqlKit do MyApp.Reports.SQL.query_one!("stats.sql", [report_id]) + ### DuckDB File-Based SQL + + For DuckDB, use the `:backend` option instead of `:repo`. The pool must be + started in your supervision tree with the database configuration: + + # In your application.ex supervision tree: + children = [ + {SqlKit.DuckDB.Pool, + name: MyApp.AnalyticsPool, + database: "priv/analytics.duckdb", + pool_size: 4} + ] + + # Then define your SQL module: + defmodule MyApp.Analytics.SQL do + use SqlKit, + otp_app: :my_app, + backend: {:duckdb, pool: MyApp.AnalyticsPool}, + dirname: "analytics", + files: ["daily_summary.sql"] + end + + MyApp.Analytics.SQL.query_all!("daily_summary.sql", [~D[2024-01-01]]) + ## Supported Databases Any Ecto adapter returning a result map containing rows and columns should work. @@ -59,10 +83,14 @@ defmodule SqlKit do ## Options - - `:otp_app` (required for file-based) - Your application name - - `:repo` (required for file-based) - The Ecto repo module to use for queries - - `:dirname` (required for file-based) - Subdirectory within root_sql_dir for this module's SQL files - - `:files` (required for file-based) - List of SQL filenames to load + - `:otp_app` (required) - Your application name + - `:repo` - The Ecto repo module to use for queries (required unless `:backend` is specified) + - `:backend` - Alternative to `:repo` for non-Ecto databases. Currently supports: + - `{:duckdb, pool: PoolName}` - Use a DuckDB connection pool + - `:dirname` (required) - Subdirectory within root_sql_dir for this module's SQL files + - `:files` (required) - List of SQL filenames to load + + Note: You must specify either `:repo` or `:backend`, but not both. """ alias SqlKit.Config @@ -72,9 +100,26 @@ defmodule SqlKit do # Standalone Query Functions # ============================================================================ + @typedoc """ + A backend for executing SQL queries. + + Can be: + - An Ecto repo module (e.g., `MyApp.Repo`) + - A `SqlKit.DuckDB.Connection` struct (for direct DuckDB connections) + - A `SqlKit.DuckDB.Pool` name (atom) for pooled DuckDB connections + """ + @type backend :: Ecto.Repo.t() | struct() | atom() + @doc """ Executes a SQL query and returns all rows as a list of maps or structs. + ## Backend + + The first argument can be: + - An Ecto repo module (e.g., `MyApp.Repo`) + - A `SqlKit.DuckDB.Connection` struct + - A `SqlKit.DuckDB.Pool` name (atom) + ## Options - `:as` - Struct module to cast results into @@ -83,6 +128,7 @@ defmodule SqlKit do ## Examples + # With Ecto repo SqlKit.query_all!(MyApp.Repo, "SELECT * FROM users") # => [%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}] @@ -91,10 +137,18 @@ defmodule SqlKit do SqlKit.query_all!(MyApp.Repo, "SELECT * FROM users", [], as: User) # => [%User{id: 1, name: "Alice"}, %User{id: 2, name: "Bob"}] + + # With DuckDB connection + {:ok, conn} = SqlKit.DuckDB.connect(":memory:") + SqlKit.query_all!(conn, "SELECT 1 as num", []) + # => [%{num: 1}] + + # With DuckDB pool + SqlKit.query_all!(MyApp.DuckDBPool, "SELECT * FROM events", []) """ - @spec query_all!(Ecto.Repo.t(), String.t(), list() | map(), keyword()) :: [map() | struct()] - def query_all!(repo, sql, params \\ [], opts \\ []) do - SqlKit.Query.all!(repo, sql, params, opts) + @spec query_all!(backend(), String.t(), list() | map(), keyword()) :: [map() | struct()] + def query_all!(backend, sql, params \\ [], opts \\ []) do + SqlKit.Query.all!(backend, sql, params, opts) end @doc """ @@ -102,21 +156,17 @@ defmodule SqlKit do Returns `{:ok, results}` on success, `{:error, exception}` on failure. - ## Options - - - `:as` - Struct module to cast results into - - `:unsafe_atoms` - If `true`, uses `String.to_atom/1` instead of - `String.to_existing_atom/1` for column names. Default: `false` + See `query_all!/4` for backend and options documentation. ## Examples SqlKit.query_all(MyApp.Repo, "SELECT * FROM users") # => {:ok, [%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}]} """ - @spec query_all(Ecto.Repo.t(), String.t(), list() | map(), keyword()) :: + @spec query_all(backend(), String.t(), list() | map(), keyword()) :: {:ok, [map() | struct()]} | {:error, term()} - def query_all(repo, sql, params \\ [], opts \\ []) do - SqlKit.Query.all(repo, sql, params, opts) + def query_all(backend, sql, params \\ [], opts \\ []) do + SqlKit.Query.all(backend, sql, params, opts) end @doc """ @@ -125,6 +175,8 @@ defmodule SqlKit do Raises `SqlKit.NoResultsError` if no rows are returned. Raises `SqlKit.MultipleResultsError` if more than one row is returned. + See `query_all!/4` for backend documentation. + ## Options - `:as` - Struct module to cast result into @@ -140,9 +192,9 @@ defmodule SqlKit do SqlKit.query_one!(MyApp.Repo, "SELECT * FROM users WHERE id = $1", [1], as: User) # => %User{id: 1, name: "Alice"} """ - @spec query_one!(Ecto.Repo.t(), String.t(), list() | map(), keyword()) :: map() | struct() - def query_one!(repo, sql, params \\ [], opts \\ []) do - SqlKit.Query.one!(repo, sql, params, opts) + @spec query_one!(backend(), String.t(), list() | map(), keyword()) :: map() | struct() + def query_one!(backend, sql, params \\ [], opts \\ []) do + SqlKit.Query.one!(backend, sql, params, opts) end @doc """ @@ -151,6 +203,8 @@ defmodule SqlKit do Returns `{:ok, result}` on exactly one result, `{:ok, nil}` on no results, or `{:error, exception}` on multiple results or other errors. + See `query_all!/4` for backend documentation. + ## Options - `:as` - Struct module to cast result into @@ -166,39 +220,108 @@ defmodule SqlKit do SqlKit.query_one(MyApp.Repo, "SELECT * FROM users WHERE id = $1", [999]) # => {:ok, nil} """ - @spec query_one(Ecto.Repo.t(), String.t(), list() | map(), keyword()) :: + @spec query_one(backend(), String.t(), list() | map(), keyword()) :: {:ok, map() | struct() | nil} | {:error, term()} - def query_one(repo, sql, params \\ [], opts \\ []) do - SqlKit.Query.one(repo, sql, params, opts) + def query_one(backend, sql, params \\ [], opts \\ []) do + SqlKit.Query.one(backend, sql, params, opts) end @doc """ Alias for `query_one!/4`. See `query_one!/4` documentation. """ - @spec query!(Ecto.Repo.t(), String.t(), list() | map(), keyword()) :: map() | struct() - def query!(repo, sql, params \\ [], opts \\ []) do - SqlKit.Query.one!(repo, sql, params, opts) + @spec query!(backend(), String.t(), list() | map(), keyword()) :: map() | struct() + def query!(backend, sql, params \\ [], opts \\ []) do + SqlKit.Query.one!(backend, sql, params, opts) end @doc """ Alias for `query_one/4`. See `query_one/4` documentation. """ - @spec query(Ecto.Repo.t(), String.t(), list() | map(), keyword()) :: + @spec query(backend(), String.t(), list() | map(), keyword()) :: {:ok, map() | struct() | nil} | {:error, term()} - def query(repo, sql, params \\ [], opts \\ []) do - SqlKit.Query.one(repo, sql, params, opts) + def query(backend, sql, params \\ [], opts \\ []) do + SqlKit.Query.one(backend, sql, params, opts) end # ============================================================================ # File-Based Macro # ============================================================================ + # Validates backend configuration at compile time. + # Returns {:ecto, repo_module} or {:duckdb, %{pool: pool_name}} + @doc false + def validate_backend_config!(opts, caller) do + repo = Keyword.get(opts, :repo) + backend = Keyword.get(opts, :backend) + + cond do + repo != nil and backend != nil -> + raise CompileError, + description: + "Cannot specify both :repo and :backend options. Use :repo for Ecto repos or :backend for DuckDB pools.", + file: caller.file, + line: caller.line + + repo != nil -> + expanded_repo = Macro.expand(repo, caller) + + if not is_atom(expanded_repo) do + raise CompileError, + description: ":repo must be an atom (module name). Got: #{inspect(repo)}", + file: caller.file, + line: caller.line + end + + {:ecto, expanded_repo} + + backend != nil -> + validate_backend_option!(backend, caller) + + true -> + raise CompileError, + description: "Missing required option: either :repo or :backend must be specified.", + file: caller.file, + line: caller.line + end + end + + defp validate_backend_option!({:duckdb, duckdb_opts}, caller) when is_list(duckdb_opts) do + pool = Keyword.get(duckdb_opts, :pool) + + if pool == nil do + raise CompileError, + description: "DuckDB backend requires :pool option. Example: backend: {:duckdb, pool: MyApp.DuckDBPool}", + file: caller.file, + line: caller.line + end + + expanded_pool = Macro.expand(pool, caller) + + if not is_atom(expanded_pool) do + raise CompileError, + description: "DuckDB :pool must be an atom (module name). Got: #{inspect(pool)}", + file: caller.file, + line: caller.line + end + + {:duckdb, %{pool: expanded_pool}} + end + + defp validate_backend_option!(other, caller) do + raise CompileError, + description: "Invalid :backend option. Expected {:duckdb, pool: PoolName}, got: #{inspect(other)}", + file: caller.file, + line: caller.line + end + defmacro __using__(opts) do otp_app = Keyword.fetch!(opts, :otp_app) - repo = Keyword.fetch!(opts, :repo) dirname = Keyword.fetch!(opts, :dirname) files = Keyword.fetch!(opts, :files) + # Validate backend configuration - either :repo or :backend required + {backend_type, backend_config} = validate_backend_config!(opts, __CALLER__) + # Build the SQL directory path at compile time root_sql_dir = Config.root_sql_dir(otp_app) sql_dir = Path.join(root_sql_dir, dirname) @@ -226,7 +349,8 @@ defmodule SqlKit do quote do @otp_app unquote(otp_app) - @repo unquote(repo) + @backend_type unquote(backend_type) + @backend_config unquote(Macro.escape(backend_config)) @sql_dir unquote(sql_dir) @doc """ @@ -327,7 +451,21 @@ defmodule SqlKit do @spec query_all!(String.t(), list() | map(), keyword()) :: [map() | struct()] def query_all!(filename, params \\ [], opts \\ []) do sql = load!(filename) - SqlKit.Query.all!(@repo, sql, params, opts) + backend = get_backend() + SqlKit.Query.all!(backend, sql, params, opts) + end + + # Returns the configured backend for query execution. + # For Ecto repos, returns the repo module. + # For DuckDB pools, returns a pool reference struct. + @doc false + case @backend_type do + :ecto -> + def get_backend, do: @backend_config + + :duckdb -> + @pool_name @backend_config.pool + def get_backend, do: SqlKit.DuckDB.Pool.pool(@pool_name) end @doc """ @@ -422,6 +560,67 @@ defmodule SqlKit do """ @spec query!(String.t(), list() | map(), keyword()) :: map() | struct() defdelegate query!(filename, params \\ [], opts \\ []), to: __MODULE__, as: :query_one! + + # DuckDB-specific streaming functions + # Only defined when backend is :duckdb + if @backend_type == :duckdb do + @doc """ + Executes a SQL query and streams results through a callback function. + + Only available for DuckDB backends. The connection is held for the + duration of the callback, which receives a stream of result chunks. + + ## Examples + + # Count rows without loading all into memory + MyApp.Analytics.SQL.with_stream!("large_query.sql", [], fn stream -> + stream + |> Stream.flat_map(& &1) + |> Enum.reduce(0, fn _row, count -> count + 1 end) + end) + + # Process first 100 rows + MyApp.Analytics.SQL.with_stream!("events.sql", [~D[2024-01-01]], fn stream -> + stream + |> Stream.flat_map(& &1) + |> Enum.take(100) + end) + + """ + @spec with_stream!(String.t(), list(), (Enumerable.t() -> result)) :: result + when result: term() + def with_stream!(filename, params \\ [], fun) do + sql = load!(filename) + pool = get_backend() + SqlKit.DuckDB.Pool.with_stream!(pool, sql, params, fun) + end + + @doc """ + Like `with_stream!/3` but also provides column names to the callback. + + The callback receives `{columns, stream}` where `columns` is a list of + column names and `stream` is the chunk stream. + + ## Examples + + MyApp.Analytics.SQL.with_stream_and_columns!("users.sql", [], fn {cols, stream} -> + IO.inspect(cols) # => ["id", "name", "age"] + stream |> Stream.flat_map(& &1) |> Enum.to_list() + end) + + """ + @spec with_stream_and_columns!( + String.t(), + list(), + ({[String.t()], Enumerable.t()} -> result) + ) :: result + when result: term() + def with_stream_and_columns!(filename, params \\ [], fun) do + sql = load!(filename) + pool = get_backend() + SqlKit.DuckDB.Pool.with_stream_and_columns!(pool, sql, params, fun) + end + end end end diff --git a/lib/sql_kit/duckdb.ex b/lib/sql_kit/duckdb.ex new file mode 100644 index 0000000..c7d9e9f --- /dev/null +++ b/lib/sql_kit/duckdb.ex @@ -0,0 +1,285 @@ +if Code.ensure_loaded?(Duckdbex) do + defmodule SqlKit.DuckDB do + @moduledoc """ + DuckDB support for SqlKit. + + Provides two ways to use DuckDB with SqlKit: + + ## Direct Connection (BYO) + + For simple use cases, scripts, or explicit control: + + {:ok, conn} = SqlKit.DuckDB.connect(":memory:") + SqlKit.query_all!(conn, "SELECT 1 as num", []) + # => [%{num: 1}] + SqlKit.DuckDB.disconnect(conn) + + ## Pooled Connection (Recommended for Production) + + For production use, add the pool to your supervision tree: + + children = [ + {SqlKit.DuckDB.Pool, + name: MyApp.AnalyticsPool, + database: "priv/analytics.duckdb", + pool_size: 4} + ] + + # Then use the pool name with SqlKit functions + SqlKit.query_all!(MyApp.AnalyticsPool, "SELECT * FROM events", []) + + ## Loading Extensions + + DuckDB extensions are loaded via SQL (SQL-first philosophy): + + SqlKit.query!(conn, "INSTALL 'parquet';", []) + SqlKit.query!(conn, "LOAD 'parquet';", []) + SqlKit.query_all!(conn, "SELECT * FROM 'data.parquet'", []) + + ## Notes + + - Uses PostgreSQL-style `$1, $2, ...` parameter placeholders + - In-memory database: use `":memory:"` string (not `:memory` atom) + - Hugeint values are automatically converted to Elixir integers + """ + + defmodule Connection do + @moduledoc """ + Struct representing a DuckDB connection. + + Contains references to the database and connection that are managed + by duckdbex NIFs. Use `SqlKit.DuckDB.connect/1,2` to create and + `SqlKit.DuckDB.disconnect/1` to close. + """ + defstruct [:db, :conn] + + @type t :: %__MODULE__{ + db: reference(), + conn: reference() + } + end + + @type connect_opts :: [config: struct()] + + @doc """ + Opens a DuckDB database and creates a connection. + + ## Arguments + + - `database` - Path to database file or `":memory:"` for in-memory database + + ## Options + + - `:config` - A `Duckdbex.Config` struct for advanced configuration + + ## Examples + + # In-memory database + {:ok, conn} = SqlKit.DuckDB.connect(":memory:") + + # File-based database + {:ok, conn} = SqlKit.DuckDB.connect("analytics.duckdb") + + # With configuration + {:ok, conn} = SqlKit.DuckDB.connect("analytics.duckdb", + config: %Duckdbex.Config{threads: 4}) + + """ + @spec connect(String.t(), connect_opts()) :: {:ok, Connection.t()} | {:error, term()} + def connect(database, opts \\ []) do + config = Keyword.get(opts, :config) + + with {:ok, db} <- open_database(database, config), + {:ok, conn} <- Duckdbex.connection(db) do + {:ok, %Connection{db: db, conn: conn}} + end + end + + @doc """ + Opens a DuckDB database and creates a connection. Raises on error. + + See `connect/2` for options. + """ + @spec connect!(String.t(), connect_opts()) :: Connection.t() + def connect!(database, opts \\ []) do + case connect(database, opts) do + {:ok, conn} -> conn + {:error, reason} -> raise "Failed to connect to DuckDB: #{inspect(reason)}" + end + end + + @doc """ + Closes a DuckDB connection and releases the database. + + ## Examples + + {:ok, conn} = SqlKit.DuckDB.connect(":memory:") + :ok = SqlKit.DuckDB.disconnect(conn) + + """ + @spec disconnect(Connection.t()) :: :ok + def disconnect(%Connection{db: db}) do + # Release the database (this releases associated connections) + Duckdbex.release(db) + :ok + end + + @doc """ + Executes a SQL query and returns columns and rows. + + This is a low-level function. Users should typically use + `SqlKit.query_all!/3`, `SqlKit.query_one!/3`, etc. instead. + + ## Examples + + {:ok, {columns, rows}} = SqlKit.DuckDB.query(conn, "SELECT 1 as num", []) + # => {:ok, {["num"], [[1]]}} + + """ + @spec query(Connection.t(), String.t(), list()) :: + {:ok, {[String.t()], [[term()]]}} | {:error, term()} + def query(%Connection{conn: conn}, sql, params) do + with {:ok, result_ref} <- execute_query(conn, sql, params) do + columns = Duckdbex.columns(result_ref) + rows = Duckdbex.fetch_all(result_ref) + {:ok, {columns, convert_hugeints(rows)}} + end + end + + @doc """ + Executes a SQL query and returns columns and rows. Raises on error. + + See `query/3` for details. + """ + @spec query!(Connection.t(), String.t(), list()) :: {[String.t()], [[term()]]} + def query!(%Connection{} = conn, sql, params) do + case query(conn, sql, params) do + {:ok, result} -> result + {:error, reason} -> raise "DuckDB query failed: #{inspect(reason)}" + end + end + + # ============================================================================ + # Streaming (Large Result Sets) + # ============================================================================ + + @doc """ + Executes a SQL query and returns a stream of result chunks. + + Unlike `query!/3` which loads all results into memory, `stream!/3` returns + a lazy `Stream` that fetches results in chunks. This is useful for large + result sets that would otherwise consume too much memory. + + Each chunk is a list of rows (each row is a list of values). Use + `Stream.flat_map/2` to iterate over individual rows. + + ## Examples + + # Stream large result set + conn + |> SqlKit.DuckDB.stream!("SELECT * FROM large_table", []) + |> Stream.flat_map(& &1) + |> Enum.take(100) + + # With column names (first element of tuple) + {columns, row_stream} = SqlKit.DuckDB.stream_with_columns!(conn, sql, []) + rows = row_stream |> Stream.flat_map(& &1) |> Enum.to_list() + + ## Notes + + - The connection is held for the duration of stream consumption + - Hugeint values are automatically converted to Elixir integers + - For pooled connections, use `SqlKit.DuckDB.Pool.with_stream!/5` + + """ + @spec stream!(Connection.t(), String.t(), list()) :: Enumerable.t() + def stream!(%Connection{conn: conn}, sql, params) do + {:ok, result_ref} = execute_query(conn, sql, params) + build_chunk_stream(result_ref) + end + + @doc """ + Like `stream!/3` but also returns column names. + + Returns `{columns, row_chunk_stream}` where `columns` is a list of column + names and `row_chunk_stream` is a stream of row chunks. + + ## Examples + + {columns, stream} = SqlKit.DuckDB.stream_with_columns!(conn, "SELECT * FROM users", []) + # columns => ["id", "name", "age"] + rows = stream |> Stream.flat_map(& &1) |> Enum.to_list() + + """ + @spec stream_with_columns!(Connection.t(), String.t(), list()) :: + {[String.t()], Enumerable.t()} + def stream_with_columns!(%Connection{conn: conn}, sql, params) do + {:ok, result_ref} = execute_query(conn, sql, params) + columns = Duckdbex.columns(result_ref) + {columns, build_chunk_stream(result_ref)} + end + + # Builds a stream that fetches chunks from a result reference + defp build_chunk_stream(result_ref) do + Stream.resource( + fn -> result_ref end, + fn ref -> + case Duckdbex.fetch_chunk(ref) do + # Empty chunk means end of results + [] -> {:halt, ref} + chunk -> {[convert_hugeints(chunk)], ref} + end + end, + fn _ref -> :ok end + ) + end + + # ============================================================================ + # Hugeint Conversion (Public - used by Pool) + # ============================================================================ + + @doc """ + Converts hugeint tuples to integers in result rows. + + Duckdbex represents HUGEINT (128-bit integers) as `{upper, lower}` tuples. + This function recursively converts all such tuples to Elixir integers. + + This is safe because other duckdbex tuple types have different arities: + - DATE: `{year, month, day}` + - TIME: `{hour, minute, second, microsecond}` + - DECIMAL: `{value, precision, scale}` + - TIMESTAMP: `{{y, m, d}, {h, m, s, us}}` + + If duckdbex adds another 2-integer-tuple type in the future, this would + need to be updated. Check duckdbex changelog on upgrades. + """ + @spec convert_hugeints([[term()]]) :: [[term()]] + def convert_hugeints(rows) do + Enum.map(rows, fn row -> + Enum.map(row, &convert_value/1) + end) + end + + @doc """ + Converts a single value, handling hugeint tuples. + + See `convert_hugeints/1` for details. + """ + @spec convert_value(term()) :: term() + def convert_value({upper, lower}) when is_integer(upper) and is_integer(lower) do + Duckdbex.hugeint_to_integer({upper, lower}) + end + + def convert_value(value), do: value + + # ============================================================================ + # Private Functions + # ============================================================================ + + defp open_database(database, nil), do: Duckdbex.open(database) + defp open_database(database, config), do: Duckdbex.open(database, config) + + defp execute_query(conn, sql, []), do: Duckdbex.query(conn, sql) + defp execute_query(conn, sql, params), do: Duckdbex.query(conn, sql, params) + end +end diff --git a/lib/sql_kit/duckdb/pool.ex b/lib/sql_kit/duckdb/pool.ex new file mode 100644 index 0000000..fb01df8 --- /dev/null +++ b/lib/sql_kit/duckdb/pool.ex @@ -0,0 +1,564 @@ +if Code.ensure_loaded?(Duckdbex) do + defmodule SqlKit.DuckDB.Pool do + @moduledoc """ + A supervised connection pool for DuckDB. + + This module manages a pool of DuckDB connections with proper lifecycle + management. The database is opened once and shared across all connections + in the pool, and is properly released when the pool terminates. + + ## Usage + + Add the pool to your application's supervision tree: + + children = [ + {SqlKit.DuckDB.Pool, + name: MyApp.AnalyticsPool, + database: "priv/analytics.duckdb", + pool_size: 4} + ] + + Then use the pool with SqlKit functions using the pool reference: + + pool = SqlKit.DuckDB.Pool.pool(MyApp.AnalyticsPool) + SqlKit.query_all!(pool, "SELECT * FROM events", []) + + Or get the pool reference from start_link: + + {:ok, pool} = SqlKit.DuckDB.Pool.start_link(name: MyPool, database: ":memory:") + SqlKit.query_all!(pool, "SELECT * FROM events", []) + + ## Options + + - `:name` - Required. The name to register the pool under (atom) + - `:database` - Required. Path to database file or `":memory:"` + - `:pool_size` - Number of connections. Default: 4 + - `:config` - Optional `Duckdbex.Config` struct for advanced configuration + + ## Architecture + + The pool is implemented as a supervision tree: + - A Supervisor manages the overall lifecycle + - A Database GenServer holds the database reference and releases it on terminate + - A NimblePool manages the individual connections + + This ensures the database is properly released when the pool stops, avoiding + resource leaks. + + ## Why Pool Connections? + + Based on DuckDB's concurrency model: + - Each connection locks during query execution + - Multiple connections enable parallel query execution + - Connection reuse is critical - disconnecting loses the in-memory cache + - Recommended: Pool of ~4 connections for typical workloads + + ## Pool Behavior + + - **Lazy initialization**: Connections are created on-demand when first checked out, + not at pool startup. This avoids startup latency. + - **Prepared statement caching**: The pool caches prepared statements per connection. + Repeated queries with the same SQL skip the prepare step. + - **Checkout timeout**: All checkout operations have a configurable timeout + (default: 5000ms). If no connection is available within the timeout, + a `NimblePool.TimeoutError` is raised. + """ + + use Supervisor + + alias SqlKit.DuckDB.Connection + + defstruct [:name, :pid] + + @type t :: %__MODULE__{name: atom(), pid: pid()} + + @default_pool_size 4 + + @doc """ + Creates a pool reference struct from a pool name. + + Use this to get a reference that can be passed to SqlKit functions. + + ## Example + + pool = SqlKit.DuckDB.Pool.pool(MyApp.AnalyticsPool) + SqlKit.query_all!(pool, "SELECT * FROM events", []) + """ + @spec pool(atom()) :: t() + def pool(name) when is_atom(name) do + case Process.whereis(sup_name(name)) do + nil -> raise "DuckDB pool #{inspect(name)} is not started" + pid -> %__MODULE__{name: name, pid: pid} + end + end + + @doc """ + Returns a child specification for the pool. + + Used by supervisors to start the pool as part of a supervision tree. + """ + def child_spec(opts) do + name = Keyword.fetch!(opts, :name) + + %{ + id: name, + start: {__MODULE__, :start_link, [opts]}, + type: :supervisor + } + end + + @doc """ + Starts the connection pool. + + Returns `{:ok, pool}` where `pool` is a struct that can be passed + directly to SqlKit functions. + + ## Options + + - `:name` - Required. The name to register the pool under + - `:database` - Required. Path to database file or `":memory:"` + - `:pool_size` - Number of connections. Default: 4 + - `:config` - Optional `Duckdbex.Config` struct + + ## Note on In-Memory Databases + + For in-memory databases, all pool connections share the same database + instance. This ensures data created on one connection is visible to others. + """ + @spec start_link(keyword()) :: {:ok, t()} | {:error, term()} + def start_link(opts) do + name = Keyword.fetch!(opts, :name) + + case Supervisor.start_link(__MODULE__, opts, name: sup_name(name)) do + {:ok, sup_pid} -> + {:ok, %__MODULE__{name: name, pid: sup_pid}} + + {:error, _} = error -> + error + end + end + + @impl Supervisor + def init(opts) do + name = Keyword.fetch!(opts, :name) + database = Keyword.fetch!(opts, :database) + pool_size = Keyword.get(opts, :pool_size, @default_pool_size) + config = Keyword.get(opts, :config) + + children = [ + # Database holder - opens db and releases on terminate + {__MODULE__.Database, name: db_name(name), database: database, config: config}, + # NimblePool for connection management + %{ + id: name, + start: + {NimblePool, :start_link, + [ + [ + worker: {__MODULE__.Worker, db_name(name)}, + pool_size: pool_size, + name: name + ] + ]} + } + ] + + # rest_for_one: if Database crashes, restart NimblePool too + # This ensures workers always have a valid db reference + Supervisor.init(children, strategy: :rest_for_one) + end + + @doc """ + Executes a SQL query using prepared statement caching. + + Prepared statements are cached per connection in the pool. Repeated queries + with the same SQL will reuse the prepared statement, skipping the prepare step. + + ## Options + + - `:cache` - Whether to use prepared statement caching. Default: `true` + - `:timeout` - Checkout timeout in milliseconds. Default: `5000` + + ## Examples + + pool = SqlKit.DuckDB.Pool.pool(MyPool) + SqlKit.DuckDB.Pool.query!(pool, "SELECT * FROM events WHERE id = $1", [1]) + # => {["id", "name"], [[1, "click"]]} + + # With custom timeout + SqlKit.DuckDB.Pool.query!(pool, sql, params, timeout: 10_000) + + """ + @spec query!(t() | atom(), String.t(), list(), keyword()) :: + {[String.t()], [[term()]]} + def query!(pool, sql, params, opts \\ []) + + def query!(%__MODULE__{name: name}, sql, params, opts), do: query!(name, sql, params, opts) + + def query!(pool_name, sql, params, opts) when is_atom(pool_name) do + cache_enabled = Keyword.get(opts, :cache, true) + timeout = Keyword.get(opts, :timeout, 5000) + + # NimblePool.checkout!/4 callback must return {result, checkin_status} where: + # - result: returned to caller + # - checkin_status: :ok (healthy), :error (remove), or {:ok, new_state} (update) + NimblePool.checkout!( + pool_name, + :checkout, + fn _from, conn_state -> + if cache_enabled do + execute_cached!(conn_state, sql, params) + else + execute_uncached!(conn_state, sql, params) + end + end, + timeout + ) + end + + @doc """ + Executes a SQL query using prepared statement caching. + + Returns `{:ok, {columns, rows}}` on success, `{:error, reason}` on failure. + + See `query!/4` for options. + """ + @spec query(t() | atom(), String.t(), list(), keyword()) :: + {:ok, {[String.t()], [[term()]]}} | {:error, term()} + def query(pool, sql, params, opts \\ []) do + {:ok, query!(pool, sql, params, opts)} + rescue + e -> {:error, e} + end + + # Execute with prepared statement caching. + # Returns {result, {:ok, updated_state}} to update the worker's cache. + defp execute_cached!(conn_state, sql, params) do + %{conn: conn, prepared_cache: cache} = conn_state + + {stmt, updated_cache} = + case Map.fetch(cache, sql) do + {:ok, stmt} -> + {stmt, cache} + + :error -> + {:ok, stmt} = Duckdbex.prepare_statement(conn, sql) + {stmt, Map.put(cache, sql, stmt)} + end + + {:ok, result_ref} = execute_statement(stmt, params) + columns = Duckdbex.columns(result_ref) + rows = Duckdbex.fetch_all(result_ref) + result = {columns, SqlKit.DuckDB.convert_hugeints(rows)} + + updated_state = %{conn_state | prepared_cache: updated_cache} + {result, {:ok, updated_state}} + end + + # Execute without caching. + # Returns {result, :ok} to return connection unchanged. + defp execute_uncached!(conn_state, sql, params) do + %{conn: conn} = conn_state + + {:ok, result_ref} = + if params == [] do + Duckdbex.query(conn, sql) + else + Duckdbex.query(conn, sql, params) + end + + columns = Duckdbex.columns(result_ref) + rows = Duckdbex.fetch_all(result_ref) + result = {columns, SqlKit.DuckDB.convert_hugeints(rows)} + + {result, :ok} + end + + defp execute_statement(stmt, []), do: Duckdbex.execute_statement(stmt) + defp execute_statement(stmt, params), do: Duckdbex.execute_statement(stmt, params) + + @doc """ + Checks out a connection from the pool and executes a function. + + The connection is automatically returned to the pool after the function + completes, even if it raises an exception. + + Note: For queries, prefer using `query!/4` which supports prepared statement + caching. Use `checkout!/2` for operations that need direct connection access. + + ## Options + + - `:timeout` - Checkout timeout in milliseconds. Default: `5000` + + ## Examples + + pool = SqlKit.DuckDB.Pool.pool(MyPool) + SqlKit.DuckDB.Pool.checkout!(pool, fn conn -> + SqlKit.DuckDB.query!(conn, "SELECT * FROM events", []) + end) + + """ + @spec checkout!(t() | atom(), (Connection.t() -> result), keyword()) :: result + when result: term() + def checkout!(pool, fun, opts \\ []) + + def checkout!(%__MODULE__{name: name}, fun, opts), do: checkout!(name, fun, opts) + + def checkout!(pool_name, fun, opts) when is_atom(pool_name) and is_function(fun, 1) do + timeout = Keyword.get(opts, :timeout, 5000) + + # NimblePool callback returns {result, checkin_status} + # :ok means connection is healthy, return to pool unchanged + NimblePool.checkout!( + pool_name, + :checkout, + fn _from, conn_state -> + conn = %Connection{db: conn_state.db, conn: conn_state.conn} + result = fun.(conn) + {result, :ok} + end, + timeout + ) + end + + @doc """ + Executes a SQL query and streams results through a callback function. + + The connection is held for the duration of the callback, which receives + a stream of result chunks. This is useful for processing large result + sets without loading everything into memory. + + ## Options + + - `:timeout` - Checkout timeout in milliseconds. Default: `5000` + + ## Examples + + pool = SqlKit.DuckDB.Pool.pool(MyPool) + + # Process large result set in chunks + SqlKit.DuckDB.Pool.with_stream!(pool, "SELECT * FROM large_table", [], fn stream -> + stream + |> Stream.flat_map(& &1) + |> Enum.reduce(0, fn _row, count -> count + 1 end) + end) + # => 1000000 + + # Take first 100 rows + SqlKit.DuckDB.Pool.with_stream!(pool, "SELECT * FROM events", [], fn stream -> + stream + |> Stream.flat_map(& &1) + |> Enum.take(100) + end) + + ## Notes + + - The connection is checked out for the entire duration of the callback + - The callback must fully consume or abandon the stream before returning + - Hugeint values are automatically converted to Elixir integers + + """ + @spec with_stream!(t() | atom(), String.t(), list(), (Enumerable.t() -> result), keyword()) :: + result + when result: term() + def with_stream!(pool, sql, params, fun, opts \\ []) + + def with_stream!(%__MODULE__{name: name}, sql, params, fun, opts) do + with_stream!(name, sql, params, fun, opts) + end + + def with_stream!(pool_name, sql, params, fun, opts) when is_atom(pool_name) and is_function(fun, 1) do + timeout = Keyword.get(opts, :timeout, 5000) + + NimblePool.checkout!( + pool_name, + :checkout, + fn _from, conn_state -> + %{conn: conn} = conn_state + + {:ok, result_ref} = + if params == [] do + Duckdbex.query(conn, sql) + else + Duckdbex.query(conn, sql, params) + end + + stream = build_chunk_stream(result_ref) + result = fun.(stream) + {result, :ok} + end, + timeout + ) + end + + @doc """ + Like `with_stream!/5` but also provides column names to the callback. + + The callback receives `{columns, stream}` where `columns` is a list of + column names and `stream` is the chunk stream. + + ## Options + + - `:timeout` - Checkout timeout in milliseconds. Default: `5000` + + ## Examples + + SqlKit.DuckDB.Pool.with_stream_and_columns!(pool, "SELECT * FROM users", [], fn {cols, stream} -> + IO.inspect(cols) # => ["id", "name", "age"] + stream |> Stream.flat_map(& &1) |> Enum.to_list() + end) + + """ + @spec with_stream_and_columns!( + t() | atom(), + String.t(), + list(), + ({[String.t()], Enumerable.t()} -> result), + keyword() + ) :: result + when result: term() + def with_stream_and_columns!(pool, sql, params, fun, opts \\ []) + + def with_stream_and_columns!(%__MODULE__{name: name}, sql, params, fun, opts) do + with_stream_and_columns!(name, sql, params, fun, opts) + end + + def with_stream_and_columns!(pool_name, sql, params, fun, opts) when is_atom(pool_name) and is_function(fun, 1) do + timeout = Keyword.get(opts, :timeout, 5000) + + NimblePool.checkout!( + pool_name, + :checkout, + fn _from, conn_state -> + %{conn: conn} = conn_state + + {:ok, result_ref} = + if params == [] do + Duckdbex.query(conn, sql) + else + Duckdbex.query(conn, sql, params) + end + + columns = Duckdbex.columns(result_ref) + stream = build_chunk_stream(result_ref) + result = fun.({columns, stream}) + {result, :ok} + end, + timeout + ) + end + + # Builds a stream that fetches chunks from a result reference + defp build_chunk_stream(result_ref) do + Stream.resource( + fn -> result_ref end, + fn ref -> + case Duckdbex.fetch_chunk(ref) do + [] -> {:halt, ref} + chunk -> {[SqlKit.DuckDB.convert_hugeints(chunk)], ref} + end + end, + fn _ref -> :ok end + ) + end + + # Private helpers for naming + defp sup_name(name), do: Module.concat(name, Supervisor) + defp db_name(name), do: Module.concat(name, Database) + end + + defmodule SqlKit.DuckDB.Pool.Database do + @moduledoc false + # Internal GenServer that holds the DuckDB database reference. + # Releases the database properly on terminate to avoid resource leaks. + + use GenServer + + def start_link(opts) do + name = Keyword.fetch!(opts, :name) + GenServer.start_link(__MODULE__, opts, name: name) + end + + @doc """ + Gets the database reference from the holder. + """ + def get_db(name), do: GenServer.call(name, :get_db) + + @impl GenServer + def init(opts) do + database = Keyword.fetch!(opts, :database) + config = Keyword.get(opts, :config) + + case open_database(database, config) do + {:ok, db} -> {:ok, db} + {:error, reason} -> {:stop, reason} + end + end + + @impl GenServer + def handle_call(:get_db, _from, db) do + {:reply, db, db} + end + + @impl GenServer + def terminate(_reason, db) do + Duckdbex.release(db) + end + + defp open_database(database, nil), do: Duckdbex.open(database) + defp open_database(database, config), do: Duckdbex.open(database, config) + end + + defmodule SqlKit.DuckDB.Pool.Worker do + @moduledoc false + # NimblePool worker behaviour implementation. + # Creates connections to the shared database managed by Database GenServer. + # + # Each worker maintains a prepared statement cache for repeated queries. + # The cache stores SQL -> stmt_ref mappings, allowing subsequent executions + # of the same SQL to skip the prepare step. + + @behaviour NimblePool + + alias SqlKit.DuckDB.Pool.Database + + @impl NimblePool + def init_worker(db_name) do + # Async initialization to avoid blocking pool startup + {:async, + fn -> + db = Database.get_db(db_name) + {:ok, conn} = Duckdbex.connection(db) + %{db: db, conn: conn, prepared_cache: %{}} + end, db_name} + end + + @impl NimblePool + def handle_checkout(:checkout, _from, conn_state, pool_state) do + {:ok, conn_state, conn_state, pool_state} + end + + @impl NimblePool + def handle_checkin({:ok, updated_state}, _from, _conn_state, pool_state) do + # Accept updated state (e.g., with new cached statements) + {:ok, updated_state, pool_state} + end + + def handle_checkin(:ok, _from, conn_state, pool_state) do + {:ok, conn_state, pool_state} + end + + def handle_checkin(:error, _from, _conn_state, pool_state) do + # Connection had an error - remove it and let NimblePool create a new one + {:remove, :connection_error, pool_state} + end + + @impl NimblePool + def terminate_worker(_reason, _conn_state, pool_state) do + # Individual connections don't need explicit cleanup + # The database is released by the Database GenServer + # Prepared statement refs are cleaned up with the connection + {:ok, pool_state} + end + end +end diff --git a/lib/sql_kit/query.ex b/lib/sql_kit/query.ex index acc17ab..84eb005 100644 --- a/lib/sql_kit/query.ex +++ b/lib/sql_kit/query.ex @@ -5,10 +5,9 @@ defmodule SqlKit.Query do @doc """ Executes SQL and returns all rows as a list of maps or structs. """ - @spec all!(Ecto.Repo.t(), String.t(), list() | map(), keyword()) :: [map() | struct()] - def all!(repo, sql, params \\ [], opts \\ []) do - result = repo.query!(sql, params) - {columns, rows} = SqlKit.extract_result(result) + @spec all!(backend :: term(), String.t(), list() | map(), keyword()) :: [map() | struct()] + def all!(backend, sql, params \\ [], opts \\ []) do + {columns, rows} = execute!(backend, sql, params) SqlKit.transform_rows(columns, rows, opts) end @@ -17,10 +16,10 @@ defmodule SqlKit.Query do Returns `{:ok, results}` on success, `{:error, exception}` on failure. """ - @spec all(Ecto.Repo.t(), String.t(), list() | map(), keyword()) :: + @spec all(backend :: term(), String.t(), list() | map(), keyword()) :: {:ok, [map() | struct()]} | {:error, term()} - def all(repo, sql, params \\ [], opts \\ []) do - {:ok, all!(repo, sql, params, opts)} + def all(backend, sql, params \\ [], opts \\ []) do + {:ok, all!(backend, sql, params, opts)} rescue e -> {:error, e} end @@ -31,11 +30,11 @@ defmodule SqlKit.Query do Raises `SqlKit.NoResultsError` if no rows are returned. Raises `SqlKit.MultipleResultsError` if more than one row is returned. """ - @spec one!(Ecto.Repo.t(), String.t(), list() | map(), keyword()) :: map() | struct() - def one!(repo, sql, params \\ [], opts \\ []) do + @spec one!(backend :: term(), String.t(), list() | map(), keyword()) :: map() | struct() + def one!(backend, sql, params \\ [], opts \\ []) do query_name = Keyword.get(opts, :query_name) || truncate_sql(sql) - case all!(repo, sql, params, opts) do + case all!(backend, sql, params, opts) do [] -> raise SqlKit.NoResultsError, query: query_name @@ -53,12 +52,12 @@ defmodule SqlKit.Query do Returns `{:ok, result}` on exactly one result, `{:ok, nil}` on no results, or `{:error, exception}` on multiple results or other errors. """ - @spec one(Ecto.Repo.t(), String.t(), list() | map(), keyword()) :: + @spec one(backend :: term(), String.t(), list() | map(), keyword()) :: {:ok, map() | struct() | nil} | {:error, term()} - def one(repo, sql, params \\ [], opts \\ []) do + def one(backend, sql, params \\ [], opts \\ []) do query_name = Keyword.get(opts, :query_name) || truncate_sql(sql) - case all(repo, sql, params, opts) do + case all(backend, sql, params, opts) do {:ok, []} -> {:ok, nil} @@ -73,6 +72,39 @@ defmodule SqlKit.Query do end end + # ============================================================================ + # Backend Detection and Execution + # ============================================================================ + + # Execute query based on backend type + defp execute!(backend, sql, params) + + # DuckDB direct connection and pool support (conditionally compiled) + if Code.ensure_loaded?(Duckdbex) do + alias SqlKit.DuckDB.Pool + + defp execute!(%SqlKit.DuckDB.Connection{} = conn, sql, params) do + # Direct connections don't use caching (simpler, users manage their own) + SqlKit.DuckDB.query!(conn, sql, params) + end + + defp execute!(%Pool{} = pool, sql, params) do + # Pool queries use prepared statement caching by default + Pool.query!(pool, sql, params) + end + end + + # Fallback - assume Ecto repo + defp execute!(repo, sql, params) do + execute_ecto!(repo, sql, params) + end + + # Execute against Ecto repo + defp execute_ecto!(repo, sql, params) do + result = repo.query!(sql, params) + SqlKit.extract_result(result) + end + defp truncate_sql(sql) do if String.length(sql) > 50 do String.slice(sql, 0, 49) <> "..." diff --git a/mix.exs b/mix.exs index 8951c90..688505f 100644 --- a/mix.exs +++ b/mix.exs @@ -62,6 +62,8 @@ defmodule SqlKit.MixProject do defp deps do [ {:ecto_sql, "~> 3.0"}, + {:nimble_pool, "~> 1.1"}, + {:duckdbex, "~> 0.3.19", optional: true}, {:postgrex, "~> 0.19", optional: true}, {:myxql, "~> 0.7", optional: true}, {:ecto_sqlite3, "~> 0.18", optional: true}, diff --git a/mix.lock b/mix.lock index 6aef4cc..6555b3a 100644 --- a/mix.lock +++ b/mix.lock @@ -6,6 +6,7 @@ "db_connection": {:hex, :db_connection, "2.8.1", "9abdc1e68c34c6163f6fb96a96532272d13ad7ca45262156ae8b7ec6d9dc4bec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a61a3d489b239d76f326e03b98794fb8e45168396c925ef25feb405ed09da8fd"}, "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, + "duckdbex": {:hex, :duckdbex, "0.3.19", "17010ca46074229e84b3a80764b1b9baa27f0a3f1529a811b5544e223bb2dfd2", [:make, :mix], [{:cc_precompiler, "~> 0.1", [hex: :cc_precompiler, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.8", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "f3c8f53e030e44b2802056997fae839acb2641baadae09aaf47dd2e1b3555bb9"}, "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, "ecto": {:hex, :ecto, "3.13.5", "9d4a69700183f33bf97208294768e561f5c7f1ecf417e0fa1006e4a91713a834", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "df9efebf70cf94142739ba357499661ef5dbb559ef902b68ea1f3c1fabce36de"}, "ecto_ch": {:hex, :ecto_ch, "0.8.3", "7f4fe5cb3cbcba4aa238eac85581190b83685e6860e0b5961454ad140227e11e", [:mix], [{:ch, "~> 0.5.0 or ~> 0.6.0", [hex: :ch, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.13.0", [hex: :ecto_sql, repo: "hexpm", optional: false]}], "hexpm", "5a0b964d71179fadd41cf46f83c7ca0b1c9350ab1b8951408d4301cb98836cc2"}, @@ -25,6 +26,7 @@ "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, "myxql": {:hex, :myxql, "0.8.0", "60c60e87c7320d2f5759416aa1758c8e7534efbae07b192861977f8455e35acd", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:geo, "~> 3.4 or ~> 4.0", [hex: :geo, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "1ec0ceb26fb3cd0f8756519cf4f0e4f9348177a020705223bdf4742a2c44d774"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, + "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "postgrex": {:hex, :postgrex, "0.21.1", "2c5cc830ec11e7a0067dd4d623c049b3ef807e9507a424985b8dcf921224cd88", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "27d8d21c103c3cc68851b533ff99eef353e6a0ff98dc444ea751de43eb48bdac"}, "sobelow": {:hex, :sobelow, "0.14.1", "2f81e8632f15574cba2402bcddff5497b413c01e6f094bc0ab94e83c2f74db81", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8fac9a2bd90fdc4b15d6fca6e1608efb7f7c600fa75800813b794ee9364c87f2"}, "styler": {:hex, :styler, "1.10.0", "343f1f7bb19a8893c2841a9ae90665b68d2edf6cc37b964a5099e60c78815c2e", [:mix], [], "hexpm", "6a78876611869466139e63722df4cbbb56b18a842d88c19f23ca844d914ad91a"}, diff --git a/test/sql_kit/duckdb_test.exs b/test/sql_kit/duckdb_test.exs new file mode 100644 index 0000000..b3767ed --- /dev/null +++ b/test/sql_kit/duckdb_test.exs @@ -0,0 +1,1035 @@ +defmodule SqlKit.DuckDBTest do + use ExUnit.Case, async: true + + alias SqlKit.DuckDB + alias SqlKit.DuckDB.Connection + alias SqlKit.DuckDB.Pool + alias SqlKit.Test.DuckDBPool + alias SqlKit.Test.DuckDBSQL + alias SqlKit.Test.User + + # Define atoms used in queries so to_existing_atom works + _ = [:num, :name, :age, :value, :id, :greeting, :email] + + describe "SqlKit.DuckDB.connect/1" do + test "connects to in-memory database" do + assert {:ok, %Connection{}} = DuckDB.connect(":memory:") + end + + test "connect! returns connection directly" do + conn = DuckDB.connect!(":memory:") + assert %Connection{} = conn + assert :ok = DuckDB.disconnect(conn) + end + + test "connects to file-based database" do + path = Path.join(System.tmp_dir!(), "test_#{:erlang.unique_integer([:positive])}.duckdb") + + try do + assert {:ok, %Connection{}} = DuckDB.connect(path) + assert File.exists?(path) + after + File.rm(path) + end + end + end + + describe "SqlKit.DuckDB.disconnect/1" do + test "closes connection cleanly" do + {:ok, conn} = DuckDB.connect(":memory:") + assert :ok = DuckDB.disconnect(conn) + end + end + + describe "SqlKit.DuckDB.query/3" do + setup do + {:ok, conn} = DuckDB.connect(":memory:") + on_exit(fn -> DuckDB.disconnect(conn) end) + %{conn: conn} + end + + test "executes simple SELECT", %{conn: conn} do + assert {:ok, {["num"], [[1]]}} = DuckDB.query(conn, "SELECT 1 as num", []) + end + + test "executes SELECT with multiple columns", %{conn: conn} do + assert {:ok, {columns, rows}} = DuckDB.query(conn, "SELECT 1 as id, 'hello' as greeting", []) + assert columns == ["id", "greeting"] + assert rows == [[1, "hello"]] + end + + test "executes parameterized query with integer", %{conn: conn} do + # Note: DuckDB requires explicit cast for integer literals in params + assert {:ok, {["num"], [[42]]}} = DuckDB.query(conn, "SELECT $1::INTEGER as num", [42]) + end + + test "handles multiple parameters", %{conn: conn} do + assert {:ok, {columns, [[3, 2]]}} = + DuckDB.query(conn, "SELECT $1::INTEGER + $2::INTEGER as sum, $1::INTEGER * $2::INTEGER as product", [1, 2]) + + assert "sum" in columns + assert "product" in columns + end + + test "returns empty rows for no results", %{conn: conn} do + DuckDB.query!(conn, "CREATE TABLE empty_test (id INTEGER)", []) + assert {:ok, {["id"], []}} = DuckDB.query(conn, "SELECT id FROM empty_test", []) + end + + test "handles NULL values", %{conn: conn} do + assert {:ok, {["value"], [[nil]]}} = DuckDB.query(conn, "SELECT NULL as value", []) + end + + test "handles boolean values", %{conn: conn} do + assert {:ok, {_, [[true, false]]}} = + DuckDB.query(conn, "SELECT true as t, false as f", []) + end + + test "handles float values", %{conn: conn} do + # DuckDB returns literal decimals as DECIMAL tuples {value, precision, scale} + # Use CAST to get a proper DOUBLE + assert {:ok, {["num"], [[3.14]]}} = DuckDB.query(conn, "SELECT CAST(3.14 AS DOUBLE) as num", []) + end + + test "returns error for invalid SQL", %{conn: conn} do + assert {:error, _} = DuckDB.query(conn, "INVALID SQL", []) + end + end + + describe "SqlKit.DuckDB.query!/3" do + setup do + {:ok, conn} = DuckDB.connect(":memory:") + on_exit(fn -> DuckDB.disconnect(conn) end) + %{conn: conn} + end + + test "returns result directly on success", %{conn: conn} do + assert {["num"], [[1]]} = DuckDB.query!(conn, "SELECT 1 as num", []) + end + + test "raises on error", %{conn: conn} do + assert_raise RuntimeError, ~r/DuckDB query failed/, fn -> + DuckDB.query!(conn, "INVALID SQL", []) + end + end + end + + describe "SqlKit standalone functions with DuckDB connection" do + setup do + {:ok, conn} = DuckDB.connect(":memory:") + + # Create and populate test table + DuckDB.query!(conn, "CREATE TABLE users (id INTEGER, name VARCHAR, age INTEGER)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (1, 'Alice', 30)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (2, 'Bob', 25)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (3, 'Charlie', 35)", []) + + on_exit(fn -> DuckDB.disconnect(conn) end) + %{conn: conn} + end + + test "query_all! returns list of maps", %{conn: conn} do + results = SqlKit.query_all!(conn, "SELECT * FROM users ORDER BY id", []) + + assert length(results) == 3 + assert hd(results).id == 1 + assert hd(results).name == "Alice" + end + + test "query_all! with parameters", %{conn: conn} do + results = SqlKit.query_all!(conn, "SELECT * FROM users WHERE age > $1", [26]) + + assert length(results) == 2 + names = Enum.map(results, & &1.name) + assert "Alice" in names + assert "Charlie" in names + end + + test "query_all! with :as option", %{conn: conn} do + results = SqlKit.query_all!(conn, "SELECT id, name, age FROM users ORDER BY id", [], as: User) + + assert length(results) == 3 + assert %User{id: 1, name: "Alice", age: 30} = hd(results) + end + + test "query_all returns {:ok, results}", %{conn: conn} do + assert {:ok, results} = SqlKit.query_all(conn, "SELECT * FROM users ORDER BY id", []) + assert length(results) == 3 + end + + test "query_all returns {:error, _} on error", %{conn: conn} do + assert {:error, _} = SqlKit.query_all(conn, "SELECT * FROM nonexistent", []) + end + + test "query_one! returns single map", %{conn: conn} do + result = SqlKit.query_one!(conn, "SELECT * FROM users WHERE id = $1", [1]) + + assert result.id == 1 + assert result.name == "Alice" + end + + test "query_one! raises NoResultsError", %{conn: conn} do + assert_raise SqlKit.NoResultsError, fn -> + SqlKit.query_one!(conn, "SELECT * FROM users WHERE id = $1", [999]) + end + end + + test "query_one! raises MultipleResultsError", %{conn: conn} do + assert_raise SqlKit.MultipleResultsError, fn -> + SqlKit.query_one!(conn, "SELECT * FROM users", []) + end + end + + test "query_one returns {:ok, result}", %{conn: conn} do + assert {:ok, result} = SqlKit.query_one(conn, "SELECT * FROM users WHERE id = $1", [1]) + assert result.name == "Alice" + end + + test "query_one returns {:ok, nil} when no results", %{conn: conn} do + assert {:ok, nil} = SqlKit.query_one(conn, "SELECT * FROM users WHERE id = $1", [999]) + end + + test "query!/4 is alias for query_one!/4", %{conn: conn} do + result = SqlKit.query!(conn, "SELECT * FROM users WHERE id = $1", [1]) + assert result.name == "Alice" + end + + test "query/4 is alias for query_one/4", %{conn: conn} do + assert {:ok, result} = SqlKit.query(conn, "SELECT * FROM users WHERE id = $1", [1]) + assert result.name == "Alice" + end + end + + describe "SqlKit.DuckDB.Pool" do + test "starts pool with in-memory database" do + pool_name = :"test_pool_#{:erlang.unique_integer([:positive])}" + + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 2) + assert %Pool{name: ^pool_name, pid: pid} = pool + assert Process.alive?(pid) + + # Clean up - pid is the supervisor + Supervisor.stop(pid) + end + + test "checkout! executes function with connection" do + pool_name = :"test_pool_#{:erlang.unique_integer([:positive])}" + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 2) + + result = + Pool.checkout!(pool, fn conn -> + DuckDB.query!(conn, "SELECT 42 as num", []) + end) + + assert {["num"], [[42]]} = result + + Supervisor.stop(pool.pid) + end + + test "pool returns connection after checkout" do + pool_name = :"test_pool_#{:erlang.unique_integer([:positive])}" + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 1) + + # First checkout should work + Pool.checkout!(pool, fn conn -> + DuckDB.query!(conn, "SELECT 1", []) + end) + + # Second checkout should also work (connection was returned) + Pool.checkout!(pool, fn conn -> + DuckDB.query!(conn, "SELECT 2", []) + end) + + Supervisor.stop(pool.pid) + end + + test "database is properly released when pool stops" do + pool_name = :"test_pool_#{:erlang.unique_integer([:positive])}" + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 1) + + # Use the pool + Pool.checkout!(pool, fn conn -> + DuckDB.query!(conn, "SELECT 1", []) + end) + + # Stop the pool - this should release the database via terminate callback + Supervisor.stop(pool.pid) + + # Verify the supervisor is stopped + refute Process.alive?(pool.pid) + + # Verify the database holder is stopped + db_holder_name = Module.concat(pool_name, Database) + assert Process.whereis(db_holder_name) == nil + end + end + + describe "SqlKit standalone functions with DuckDB pool" do + setup do + pool_name = :"test_pool_#{:erlang.unique_integer([:positive])}" + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 2) + + # Set up test data + Pool.checkout!(pool, fn conn -> + DuckDB.query!(conn, "CREATE TABLE users (id INTEGER, name VARCHAR, age INTEGER)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (1, 'Alice', 30)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (2, 'Bob', 25)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (3, 'Charlie', 35)", []) + end) + + on_exit(fn -> + try do + if Process.alive?(pool.pid), do: Supervisor.stop(pool.pid) + catch + :exit, _ -> :ok + end + end) + + %{pool: pool} + end + + test "query_all! with pool", %{pool: pool} do + results = SqlKit.query_all!(pool, "SELECT * FROM users ORDER BY id", []) + + assert length(results) == 3 + assert hd(results).name == "Alice" + end + + test "query_all! with pool and parameters", %{pool: pool} do + results = SqlKit.query_all!(pool, "SELECT * FROM users WHERE age > $1", [26]) + + assert length(results) == 2 + end + + test "query_all! with pool and :as option", %{pool: pool} do + results = SqlKit.query_all!(pool, "SELECT id, name, age FROM users ORDER BY id", [], as: User) + + assert length(results) == 3 + assert %User{id: 1, name: "Alice", age: 30} = hd(results) + end + + test "query_one! with pool and :as option", %{pool: pool} do + result = SqlKit.query_one!(pool, "SELECT id, name, age FROM users WHERE id = $1", [1], as: User) + + assert %User{id: 1, name: "Alice", age: 30} = result + end + + test "query_one! with pool", %{pool: pool} do + result = SqlKit.query_one!(pool, "SELECT * FROM users WHERE id = $1", [1]) + + assert result.name == "Alice" + end + + test "query_one! with pool raises NoResultsError", %{pool: pool} do + assert_raise SqlKit.NoResultsError, fn -> + SqlKit.query_one!(pool, "SELECT * FROM users WHERE id = $1", [999]) + end + end + + test "query_all with pool returns {:ok, results}", %{pool: pool} do + assert {:ok, results} = SqlKit.query_all(pool, "SELECT * FROM users ORDER BY id", []) + assert length(results) == 3 + end + + test "query_one with pool returns {:ok, result}", %{pool: pool} do + assert {:ok, result} = SqlKit.query_one(pool, "SELECT * FROM users WHERE id = $1", [1]) + assert result.name == "Alice" + end + + test "query_one with pool returns {:ok, nil} when no results", %{pool: pool} do + assert {:ok, nil} = SqlKit.query_one(pool, "SELECT * FROM users WHERE id = $1", [999]) + end + end + + describe "DuckDB-specific features" do + setup do + {:ok, conn} = DuckDB.connect(":memory:") + on_exit(fn -> DuckDB.disconnect(conn) end) + %{conn: conn} + end + + test "handles DATE type", %{conn: conn} do + DuckDB.query!(conn, "CREATE TABLE dates (d DATE)", []) + DuckDB.query!(conn, "INSERT INTO dates VALUES ('2024-01-15')", []) + + result = SqlKit.query_one!(conn, "SELECT d FROM dates", []) + # DuckDB returns dates as tuples + assert result.d == {2024, 1, 15} + end + + test "handles TIMESTAMP type", %{conn: conn} do + DuckDB.query!(conn, "CREATE TABLE timestamps (ts TIMESTAMP)", []) + DuckDB.query!(conn, "INSERT INTO timestamps VALUES ('2024-01-15 10:30:00')", []) + + result = SqlKit.query_one!(conn, "SELECT ts FROM timestamps", []) + # DuckDB returns timestamps as nested tuples + assert {{2024, 1, 15}, {10, 30, 0, 0}} = result.ts + end + + test "handles LIST type", %{conn: conn} do + result = SqlKit.query_one!(conn, "SELECT [1, 2, 3] as nums", [], unsafe_atoms: true) + assert result.nums == [1, 2, 3] + end + + test "handles STRUCT type", %{conn: conn} do + result = + SqlKit.query_one!(conn, "SELECT {'name': 'Alice', 'age': 30} as person", [], unsafe_atoms: true) + + assert result.person == %{"age" => 30, "name" => "Alice"} + end + + test "loads extensions via SQL", %{conn: conn} do + # This just verifies the SQL-first approach works + # INSTALL may fail if extension already installed, that's ok + DuckDB.query(conn, "INSTALL 'json'", []) + assert {:ok, _} = DuckDB.query(conn, "LOAD 'json'", []) + end + end + + describe "file-based database persistence" do + setup do + # Create a unique temp file path for each test + path = Path.join(System.tmp_dir!(), "sqlkit_test_#{:erlang.unique_integer([:positive])}.duckdb") + + on_exit(fn -> + # Clean up database files (DuckDB may create .wal files too) + File.rm(path) + File.rm(path <> ".wal") + end) + + %{db_path: path} + end + + test "data persists across direct connections", %{db_path: path} do + # First connection: create table and insert data + {:ok, conn1} = DuckDB.connect(path) + DuckDB.query!(conn1, "CREATE TABLE persistence_test (id INTEGER, value VARCHAR)", []) + DuckDB.query!(conn1, "INSERT INTO persistence_test VALUES (1, 'hello')", []) + DuckDB.query!(conn1, "INSERT INTO persistence_test VALUES (2, 'world')", []) + DuckDB.disconnect(conn1) + + # Second connection: verify data persisted + {:ok, conn2} = DuckDB.connect(path) + results = SqlKit.query_all!(conn2, "SELECT * FROM persistence_test ORDER BY id", []) + + assert length(results) == 2 + assert Enum.at(results, 0).id == 1 + assert Enum.at(results, 0).value == "hello" + assert Enum.at(results, 1).id == 2 + assert Enum.at(results, 1).value == "world" + + DuckDB.disconnect(conn2) + end + + test "queries work with file-based pool", %{db_path: path} do + pool_name = :"file_pool_#{:erlang.unique_integer([:positive])}" + {:ok, pool} = Pool.start_link(name: pool_name, database: path, pool_size: 2) + + # Create table and insert data via pool + Pool.checkout!(pool, fn conn -> + DuckDB.query!(conn, "CREATE TABLE pool_file_test (id INTEGER, name VARCHAR)", []) + DuckDB.query!(conn, "INSERT INTO pool_file_test VALUES (1, 'Alice')", []) + DuckDB.query!(conn, "INSERT INTO pool_file_test VALUES (2, 'Bob')", []) + end) + + # Query via SqlKit API + results = SqlKit.query_all!(pool, "SELECT * FROM pool_file_test ORDER BY id", []) + + assert length(results) == 2 + assert hd(results).name == "Alice" + + Supervisor.stop(pool.pid) + end + + test "pool data persists across restarts", %{db_path: path} do + pool_name = :"persist_pool_#{:erlang.unique_integer([:positive])}" + + # First pool: create and populate + {:ok, pool1} = Pool.start_link(name: pool_name, database: path, pool_size: 2) + + Pool.checkout!(pool1, fn conn -> + DuckDB.query!(conn, "CREATE TABLE restart_test (id INTEGER, data VARCHAR)", []) + DuckDB.query!(conn, "INSERT INTO restart_test VALUES (1, 'persisted')", []) + end) + + # Verify data is there + result1 = SqlKit.query_one!(pool1, "SELECT * FROM restart_test WHERE id = $1", [1]) + assert result1.data == "persisted" + + # Stop the pool completely + Supervisor.stop(pool1.pid) + + # Verify pool is stopped + refute Process.alive?(pool1.pid) + + # Start a new pool with the same database file + {:ok, pool2} = Pool.start_link(name: pool_name, database: path, pool_size: 2) + + # Data should still be there + result2 = SqlKit.query_one!(pool2, "SELECT * FROM restart_test WHERE id = $1", [1]) + assert result2.data == "persisted" + + # Can also insert more data + Pool.checkout!(pool2, fn conn -> + DuckDB.query!(conn, "INSERT INTO restart_test VALUES (2, 'new_data')", []) + end) + + results = SqlKit.query_all!(pool2, "SELECT * FROM restart_test ORDER BY id", []) + assert length(results) == 2 + + Supervisor.stop(pool2.pid) + end + + test "file-based pool with :as option", %{db_path: path} do + pool_name = :"as_pool_#{:erlang.unique_integer([:positive])}" + {:ok, pool} = Pool.start_link(name: pool_name, database: path, pool_size: 2) + + Pool.checkout!(pool, fn conn -> + DuckDB.query!(conn, "CREATE TABLE users (id INTEGER, name VARCHAR, age INTEGER)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (1, 'Alice', 30)", []) + end) + + result = SqlKit.query_one!(pool, "SELECT id, name, age FROM users WHERE id = $1", [1], as: User) + assert %User{id: 1, name: "Alice", age: 30} = result + + Supervisor.stop(pool.pid) + end + end + + describe "file-based SQL with DuckDB" do + # These tests use the SqlKit.Test.DuckDBSQL module defined in test_sql_modules.ex + # which uses `backend: {:duckdb, pool: SqlKit.Test.DuckDBPool}` + + setup do + pool_name = DuckDBPool + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 2) + + # Set up test data matching the other database tests + Pool.checkout!(pool, fn conn -> + DuckDB.query!(conn, "CREATE TABLE users (id INTEGER, name VARCHAR, email VARCHAR, age INTEGER)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (1, 'Alice', 'alice@test.com', 30)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (2, 'Bob', 'bob@test.com', 25)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (3, 'Charlie', 'charlie@test.com', 35)", []) + end) + + on_exit(fn -> + try do + if Process.alive?(pool.pid), do: Supervisor.stop(pool.pid) + catch + :exit, _ -> :ok + end + end) + + %{pool: pool} + end + + test "load! returns SQL content", _context do + sql = DuckDBSQL.load!("all_users.sql") + assert sql =~ "SELECT" + assert sql =~ "FROM users" + end + + test "load returns {:ok, sql}", _context do + assert {:ok, sql} = DuckDBSQL.load("all_users.sql") + assert sql =~ "SELECT" + end + + test "query_all! returns all rows", _context do + results = DuckDBSQL.query_all!("all_users.sql") + + assert length(results) == 3 + assert hd(results).name == "Alice" + end + + test "query_all! with :as option", _context do + results = DuckDBSQL.query_all!("all_users.sql", [], as: User) + + assert length(results) == 3 + assert %User{id: 1, name: "Alice", email: "alice@test.com", age: 30} = hd(results) + end + + test "query_all! with parameters", _context do + results = DuckDBSQL.query_all!("users_by_age_range.sql", [26, 40]) + + assert length(results) == 2 + names = Enum.map(results, & &1.name) + assert "Alice" in names + assert "Charlie" in names + end + + test "query_all returns {:ok, results}", _context do + assert {:ok, results} = DuckDBSQL.query_all("all_users.sql") + assert length(results) == 3 + end + + test "query_one! returns single row", _context do + result = DuckDBSQL.query_one!("first_user.sql") + + assert result.id == 1 + assert result.name == "Alice" + end + + test "query_one! with parameter", _context do + result = DuckDBSQL.query_one!("user_by_id.sql", [2]) + + assert result.id == 2 + assert result.name == "Bob" + end + + test "query_one! with :as option", _context do + result = DuckDBSQL.query_one!("user_by_id.sql", [1], as: User) + + assert %User{id: 1, name: "Alice", email: "alice@test.com", age: 30} = result + end + + test "query_one! raises NoResultsError for no matches", _context do + assert_raise SqlKit.NoResultsError, fn -> + DuckDBSQL.query_one!("no_users.sql") + end + end + + test "query_one! raises MultipleResultsError for multiple matches", _context do + assert_raise SqlKit.MultipleResultsError, fn -> + DuckDBSQL.query_one!("all_users.sql") + end + end + + test "query_one returns {:ok, result}", _context do + assert {:ok, result} = DuckDBSQL.query_one("first_user.sql") + assert result.name == "Alice" + end + + test "query_one returns {:ok, nil} for no results", _context do + assert {:ok, nil} = DuckDBSQL.query_one("no_users.sql") + end + + test "query!/3 is alias for query_one!/3", _context do + result = DuckDBSQL.query!("user_by_id.sql", [1]) + assert result.name == "Alice" + end + + test "query/3 is alias for query_one/3", _context do + assert {:ok, result} = DuckDBSQL.query("user_by_id.sql", [1]) + assert result.name == "Alice" + end + end + + # ============================================================================ + # Phase 5: Prepared Statement Caching + # ============================================================================ + + describe "SqlKit.DuckDB.Pool.query!/4 with prepared statement caching" do + setup do + pool_name = :"cache_test_pool_#{:erlang.unique_integer([:positive])}" + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 1) + + Pool.checkout!(pool, fn conn -> + DuckDB.query!(conn, "CREATE TABLE cache_test (id INTEGER, value VARCHAR)", []) + DuckDB.query!(conn, "INSERT INTO cache_test VALUES (1, 'one')", []) + DuckDB.query!(conn, "INSERT INTO cache_test VALUES (2, 'two')", []) + DuckDB.query!(conn, "INSERT INTO cache_test VALUES (3, 'three')", []) + end) + + on_exit(fn -> + try do + if Process.alive?(pool.pid), do: Supervisor.stop(pool.pid) + catch + :exit, _ -> :ok + end + end) + + %{pool: pool} + end + + test "queries work with caching enabled (default)", %{pool: pool} do + # Run the same query multiple times + for _ <- 1..3 do + result = Pool.query!(pool, "SELECT * FROM cache_test WHERE id = $1", [1]) + assert {["id", "value"], [[1, "one"]]} = result + end + end + + test "repeated queries with same SQL use cached statement", %{pool: pool} do + sql = "SELECT * FROM cache_test WHERE id = $1" + + # First query - should prepare and cache + result1 = Pool.query!(pool, sql, [1]) + assert {["id", "value"], [[1, "one"]]} = result1 + + # Second query with different params - should use cached statement + result2 = Pool.query!(pool, sql, [2]) + assert {["id", "value"], [[2, "two"]]} = result2 + + # Third query - still cached + result3 = Pool.query!(pool, sql, [3]) + assert {["id", "value"], [[3, "three"]]} = result3 + end + + test "queries work with caching disabled", %{pool: pool} do + result = Pool.query!(pool, "SELECT * FROM cache_test WHERE id = $1", [1], cache: false) + assert {["id", "value"], [[1, "one"]]} = result + + # Run again without cache + result2 = Pool.query!(pool, "SELECT * FROM cache_test WHERE id = $1", [2], cache: false) + assert {["id", "value"], [[2, "two"]]} = result2 + end + + test "Pool.query/4 returns {:ok, result}", %{pool: pool} do + assert {:ok, {["id", "value"], [[1, "one"]]}} = + Pool.query(pool, "SELECT * FROM cache_test WHERE id = $1", [1]) + end + + test "Pool.query/4 returns {:error, _} on error", %{pool: pool} do + assert {:error, _} = Pool.query(pool, "INVALID SQL", []) + end + + test "SqlKit functions use cached pool queries", %{pool: pool} do + # This uses Pool.query! internally + results = SqlKit.query_all!(pool, "SELECT * FROM cache_test ORDER BY id", []) + assert length(results) == 3 + + # Run again - should use cached statement + results2 = SqlKit.query_all!(pool, "SELECT * FROM cache_test ORDER BY id", []) + assert length(results2) == 3 + end + end + + # ============================================================================ + # Phase 5: Streaming + # ============================================================================ + + describe "SqlKit.DuckDB.stream!/3 (direct connection)" do + setup do + {:ok, conn} = DuckDB.connect(":memory:") + + # Create table with many rows + DuckDB.query!(conn, "CREATE TABLE stream_test (id INTEGER, value VARCHAR)", []) + + for i <- 1..100 do + DuckDB.query!(conn, "INSERT INTO stream_test VALUES ($1, $2)", [i, "value_#{i}"]) + end + + on_exit(fn -> DuckDB.disconnect(conn) end) + %{conn: conn} + end + + test "returns a stream of result chunks", %{conn: conn} do + stream = DuckDB.stream!(conn, "SELECT * FROM stream_test ORDER BY id", []) + assert is_function(stream, 2) + + # Collect all rows + rows = + stream + |> Stream.flat_map(& &1) + |> Enum.to_list() + + assert length(rows) == 100 + assert hd(rows) == [1, "value_1"] + assert List.last(rows) == [100, "value_100"] + end + + test "can take limited rows from stream", %{conn: conn} do + rows = + conn + |> DuckDB.stream!("SELECT * FROM stream_test ORDER BY id", []) + |> Stream.flat_map(& &1) + |> Enum.take(10) + + assert length(rows) == 10 + assert hd(rows) == [1, "value_1"] + assert List.last(rows) == [10, "value_10"] + end + + test "stream_with_columns! returns columns and stream", %{conn: conn} do + {columns, stream} = DuckDB.stream_with_columns!(conn, "SELECT * FROM stream_test ORDER BY id", []) + + assert columns == ["id", "value"] + assert is_function(stream, 2) + + rows = stream |> Stream.flat_map(& &1) |> Enum.take(5) + assert length(rows) == 5 + end + + test "handles empty results", %{conn: conn} do + rows = + conn + |> DuckDB.stream!("SELECT * FROM stream_test WHERE id < 0", []) + |> Stream.flat_map(& &1) + |> Enum.to_list() + + assert rows == [] + end + + test "handles parameters in stream query", %{conn: conn} do + rows = + conn + |> DuckDB.stream!("SELECT * FROM stream_test WHERE id > $1 ORDER BY id", [95]) + |> Stream.flat_map(& &1) + |> Enum.to_list() + + assert length(rows) == 5 + assert hd(rows) == [96, "value_96"] + end + end + + describe "SqlKit.DuckDB.Pool.with_stream!/4" do + setup do + pool_name = :"stream_pool_#{:erlang.unique_integer([:positive])}" + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 1) + + Pool.checkout!(pool, fn conn -> + DuckDB.query!(conn, "CREATE TABLE stream_pool_test (id INTEGER, name VARCHAR)", []) + + for i <- 1..50 do + DuckDB.query!(conn, "INSERT INTO stream_pool_test VALUES ($1, $2)", [i, "name_#{i}"]) + end + end) + + on_exit(fn -> + try do + if Process.alive?(pool.pid), do: Supervisor.stop(pool.pid) + catch + :exit, _ -> :ok + end + end) + + %{pool: pool} + end + + test "streams results through callback", %{pool: pool} do + count = + Pool.with_stream!(pool, "SELECT * FROM stream_pool_test", [], fn stream -> + stream + |> Stream.flat_map(& &1) + |> Enum.count() + end) + + assert count == 50 + end + + test "can process limited rows", %{pool: pool} do + rows = + Pool.with_stream!(pool, "SELECT * FROM stream_pool_test ORDER BY id", [], fn stream -> + stream + |> Stream.flat_map(& &1) + |> Enum.take(10) + end) + + assert length(rows) == 10 + assert hd(rows) == [1, "name_1"] + end + + test "with_stream_and_columns! provides column names", %{pool: pool} do + {columns, first_row} = + Pool.with_stream_and_columns!( + pool, + "SELECT * FROM stream_pool_test ORDER BY id", + [], + fn {cols, stream} -> + row = stream |> Stream.flat_map(& &1) |> Enum.at(0) + {cols, row} + end + ) + + assert columns == ["id", "name"] + assert first_row == [1, "name_1"] + end + + test "handles parameters", %{pool: pool} do + count = + Pool.with_stream!(pool, "SELECT * FROM stream_pool_test WHERE id > $1", [40], fn stream -> + stream |> Stream.flat_map(& &1) |> Enum.count() + end) + + assert count == 10 + end + + test "connection is returned after stream processing", %{pool: pool} do + # Process stream + Pool.with_stream!(pool, "SELECT 1", [], fn stream -> + stream |> Stream.flat_map(& &1) |> Enum.to_list() + end) + + # Pool should still be usable (connection returned) + result = Pool.query!(pool, "SELECT 42 as num", []) + assert {["num"], [[42]]} = result + end + end + + describe "file-based SQL streaming with DuckDB" do + setup do + pool_name = DuckDBPool + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 2) + + Pool.checkout!(pool, fn conn -> + DuckDB.query!( + conn, + "CREATE TABLE users (id INTEGER, name VARCHAR, email VARCHAR, age INTEGER)", + [] + ) + + for i <- 1..20 do + DuckDB.query!( + conn, + "INSERT INTO users VALUES ($1, $2, $3, $4)", + [i, "User#{i}", "user#{i}@test.com", 20 + i] + ) + end + end) + + on_exit(fn -> + try do + if Process.alive?(pool.pid), do: Supervisor.stop(pool.pid) + catch + :exit, _ -> :ok + end + end) + + %{pool: pool} + end + + test "with_stream! works with file-based SQL", _context do + count = + DuckDBSQL.with_stream!("all_users.sql", [], fn stream -> + stream |> Stream.flat_map(& &1) |> Enum.count() + end) + + assert count == 20 + end + + test "with_stream_and_columns! works with file-based SQL", _context do + {columns, count} = + DuckDBSQL.with_stream_and_columns!("all_users.sql", [], fn {cols, stream} -> + cnt = stream |> Stream.flat_map(& &1) |> Enum.count() + {cols, cnt} + end) + + assert "id" in columns + assert "name" in columns + assert count == 20 + end + + test "with_stream! with parameters", _context do + rows = + DuckDBSQL.with_stream!("users_by_age_range.sql", [25, 30], fn stream -> + stream |> Stream.flat_map(& &1) |> Enum.to_list() + end) + + # Users with age 25-30 (ages are 21-40 based on 20+i) + refute Enum.empty?(rows) + end + end + + # ============================================================================ + # Phase 5: Pool Tuning Options + # ============================================================================ + + describe "pool timeout options" do + test "checkout! accepts timeout option" do + pool_name = :"timeout_pool_#{:erlang.unique_integer([:positive])}" + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 1) + + # Should work with explicit timeout + result = + Pool.checkout!( + pool, + fn conn -> + DuckDB.query!(conn, "SELECT 1 as num", []) + end, + timeout: 10_000 + ) + + assert {["num"], [[1]]} = result + Supervisor.stop(pool.pid) + end + + test "query! accepts timeout option" do + pool_name = :"timeout_pool_#{:erlang.unique_integer([:positive])}" + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 1) + + result = Pool.query!(pool, "SELECT 42 as num", [], timeout: 10_000) + assert {["num"], [[42]]} = result + + Supervisor.stop(pool.pid) + end + + test "with_stream! accepts timeout option" do + pool_name = :"timeout_pool_#{:erlang.unique_integer([:positive])}" + {:ok, pool} = Pool.start_link(name: pool_name, database: ":memory:", pool_size: 1) + + result = + Pool.with_stream!( + pool, + "SELECT 1 as num", + [], + fn stream -> + stream |> Stream.flat_map(& &1) |> Enum.to_list() + end, + timeout: 10_000 + ) + + assert result == [[1]] + + Supervisor.stop(pool.pid) + end + end + + describe "file-based SQL with persistent database" do + setup do + path = Path.join(System.tmp_dir!(), "sqlkit_filebased_#{:erlang.unique_integer([:positive])}.duckdb") + + on_exit(fn -> + File.rm(path) + File.rm(path <> ".wal") + end) + + %{db_path: path} + end + + test "queries work with file-based pool and data persists across restarts", %{db_path: path} do + pool_name = DuckDBPool + + # Start pool with file-based database + {:ok, pool} = Pool.start_link(name: pool_name, database: path, pool_size: 2) + + # Set up test data + Pool.checkout!(pool, fn conn -> + DuckDB.query!(conn, "CREATE TABLE users (id INTEGER, name VARCHAR, email VARCHAR, age INTEGER)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (1, 'Alice', 'alice@test.com', 30)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (2, 'Bob', 'bob@test.com', 25)", []) + DuckDB.query!(conn, "INSERT INTO users VALUES (3, 'Charlie', 'charlie@test.com', 35)", []) + end) + + # Query using file-based SQL module + results = DuckDBSQL.query_all!("all_users.sql") + assert length(results) == 3 + assert hd(results).name == "Alice" + + # Query with parameters + result = DuckDBSQL.query_one!("user_by_id.sql", [2]) + assert result.name == "Bob" + + # Stop the pool completely + Supervisor.stop(pool.pid) + refute Process.alive?(pool.pid) + + # Start a new pool with the same database file + {:ok, pool2} = Pool.start_link(name: pool_name, database: path, pool_size: 2) + + # Data should still be there - query using file-based SQL module + results2 = DuckDBSQL.query_all!("all_users.sql") + assert length(results2) == 3 + + # Verify specific queries still work + result2 = DuckDBSQL.query_one!("user_by_id.sql", [3]) + assert result2.name == "Charlie" + + # Test with :as option after restart + users = DuckDBSQL.query_all!("all_users.sql", [], as: User) + assert length(users) == 3 + assert %User{id: 1, name: "Alice"} = hd(users) + + Supervisor.stop(pool2.pid) + end + end +end diff --git a/test/support/sql/test_duckdb/all_users.sql b/test/support/sql/test_duckdb/all_users.sql new file mode 100644 index 0000000..5077221 --- /dev/null +++ b/test/support/sql/test_duckdb/all_users.sql @@ -0,0 +1 @@ +SELECT id, name, email, age FROM users ORDER BY id diff --git a/test/support/sql/test_duckdb/first_user.sql b/test/support/sql/test_duckdb/first_user.sql new file mode 100644 index 0000000..c297fb7 --- /dev/null +++ b/test/support/sql/test_duckdb/first_user.sql @@ -0,0 +1 @@ +SELECT id, name, email, age FROM users ORDER BY id LIMIT 1 diff --git a/test/support/sql/test_duckdb/no_users.sql b/test/support/sql/test_duckdb/no_users.sql new file mode 100644 index 0000000..e88e387 --- /dev/null +++ b/test/support/sql/test_duckdb/no_users.sql @@ -0,0 +1 @@ +SELECT id, name, email, age FROM users WHERE 1 = 0 diff --git a/test/support/sql/test_duckdb/user_by_id.sql b/test/support/sql/test_duckdb/user_by_id.sql new file mode 100644 index 0000000..fa4fade --- /dev/null +++ b/test/support/sql/test_duckdb/user_by_id.sql @@ -0,0 +1 @@ +SELECT id, name, email, age FROM users WHERE id = $1 diff --git a/test/support/sql/test_duckdb/users_by_age_range.sql b/test/support/sql/test_duckdb/users_by_age_range.sql new file mode 100644 index 0000000..c6519f1 --- /dev/null +++ b/test/support/sql/test_duckdb/users_by_age_range.sql @@ -0,0 +1 @@ +SELECT id, name, email, age FROM users WHERE age >= $1 AND age <= $2 ORDER BY id diff --git a/test/support/test_sql_modules.ex b/test/support/test_sql_modules.ex index 1839257..1cf37cf 100644 --- a/test/support/test_sql_modules.ex +++ b/test/support/test_sql_modules.ex @@ -43,6 +43,15 @@ defmodule SqlKit.Test.ClickHouseSQL do files: ["all_users.sql", "first_user.sql", "no_users.sql", "user_by_id.sql", "users_by_age_range.sql"] end +defmodule SqlKit.Test.DuckDBSQL do + @moduledoc false + use SqlKit, + otp_app: :sql_kit, + backend: {:duckdb, pool: SqlKit.Test.DuckDBPool}, + dirname: "test_duckdb", + files: ["all_users.sql", "first_user.sql", "no_users.sql", "user_by_id.sql", "users_by_age_range.sql"] +end + # Test struct for casting defmodule SqlKit.Test.User do @moduledoc false