diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..19f0a3a --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[env] +SQLX_OFFLINE = "true" diff --git a/.github/workflows/gateway.yml b/.github/workflows/gateway.yml index fafd111..98798cb 100644 --- a/.github/workflows/gateway.yml +++ b/.github/workflows/gateway.yml @@ -37,10 +37,15 @@ jobs: --health-retries 12 env: TEST_DATABASE_URL: postgres://agent_gateway_admin:agent_gateway_dev@localhost:5432/agent_gateway_test + SQLX_OFFLINE: true steps: - uses: actions/checkout@v5 - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 + - run: cargo install sqlx-cli --version 0.8.6 --locked --no-default-features --features postgres + - run: SQLX_OFFLINE=false DATABASE_URL="$TEST_DATABASE_URL" cargo sqlx database setup + - run: SQLX_OFFLINE=false DATABASE_URL="$TEST_DATABASE_URL" cargo sqlx prepare --check -- --all-targets --locked + - run: cargo clippy --locked --all-targets - run: cargo test --locked image: diff --git a/.sqlx/query-2e5bfa8b38514ea1eb638c158622d5345cb1bcd943017773493f3ce555c09710.json b/.sqlx/query-2e5bfa8b38514ea1eb638c158622d5345cb1bcd943017773493f3ce555c09710.json new file mode 100644 index 0000000..9671a70 --- /dev/null +++ b/.sqlx/query-2e5bfa8b38514ea1eb638c158622d5345cb1bcd943017773493f3ce555c09710.json @@ -0,0 +1,21 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO permission_registry (\n permission_id, signing_key_id, subject_identity, subject_public_key_spki_der, destination,\n not_before, not_after, signature\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Bytea", + "Text", + "Timestamptz", + "Timestamptz", + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "2e5bfa8b38514ea1eb638c158622d5345cb1bcd943017773493f3ce555c09710" +} diff --git a/.sqlx/query-3535f76a2186e39edf59d8e40879b7f4ac530e3a18295b3b32a9da7c7b65b7c8.json b/.sqlx/query-3535f76a2186e39edf59d8e40879b7f4ac530e3a18295b3b32a9da7c7b65b7c8.json new file mode 100644 index 0000000..318c7a6 --- /dev/null +++ b/.sqlx/query-3535f76a2186e39edf59d8e40879b7f4ac530e3a18295b3b32a9da7c7b65b7c8.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM principal_key_permissions WHERE signing_key_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "3535f76a2186e39edf59d8e40879b7f4ac530e3a18295b3b32a9da7c7b65b7c8" +} diff --git a/.sqlx/query-3e49d130c02ac882e0c36f80b8258fb7beba42c39921be0287db22b4a79a0b46.json b/.sqlx/query-3e49d130c02ac882e0c36f80b8258fb7beba42c39921be0287db22b4a79a0b46.json new file mode 100644 index 0000000..355bcbc --- /dev/null +++ b/.sqlx/query-3e49d130c02ac882e0c36f80b8258fb7beba42c39921be0287db22b4a79a0b46.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM permission_registry WHERE signing_key_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "3e49d130c02ac882e0c36f80b8258fb7beba42c39921be0287db22b4a79a0b46" +} diff --git a/.sqlx/query-47cff29e5dcfc88d719d162f656cbf02672645bc5c2565b39d8949e2a2212909.json b/.sqlx/query-47cff29e5dcfc88d719d162f656cbf02672645bc5c2565b39d8949e2a2212909.json new file mode 100644 index 0000000..c7e92c9 --- /dev/null +++ b/.sqlx/query-47cff29e5dcfc88d719d162f656cbf02672645bc5c2565b39d8949e2a2212909.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO principal_key_permissions (\n signing_key_id, destination, not_before, not_after\n )\n VALUES ($1, $2, $3, $4)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Timestamptz", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "47cff29e5dcfc88d719d162f656cbf02672645bc5c2565b39d8949e2a2212909" +} diff --git a/.sqlx/query-5ec9ac85b3956ecac884f312e7a8f21540c6f986aec6c6cc0f4b7ff53ffcc68a.json b/.sqlx/query-5ec9ac85b3956ecac884f312e7a8f21540c6f986aec6c6cc0f4b7ff53ffcc68a.json new file mode 100644 index 0000000..fd72afe --- /dev/null +++ b/.sqlx/query-5ec9ac85b3956ecac884f312e7a8f21540c6f986aec6c6cc0f4b7ff53ffcc68a.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE permission_registry SET revoked_at = now() WHERE permission_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "5ec9ac85b3956ecac884f312e7a8f21540c6f986aec6c6cc0f4b7ff53ffcc68a" +} diff --git a/.sqlx/query-81ab64535042f97a4a1143873b068720752c4ab96c19a01d2ef23a716c063454.json b/.sqlx/query-81ab64535042f97a4a1143873b068720752c4ab96c19a01d2ef23a716c063454.json new file mode 100644 index 0000000..261b57d --- /dev/null +++ b/.sqlx/query-81ab64535042f97a4a1143873b068720752c4ab96c19a01d2ef23a716c063454.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO principal_signing_keys (\n key_id, algorithm, public_key_spki_der,\n not_before, not_after\n )\n VALUES ($1, 'ecdsa_p256_sha256', $2, $3, $4)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Bytea", + "Timestamptz", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "81ab64535042f97a4a1143873b068720752c4ab96c19a01d2ef23a716c063454" +} diff --git a/.sqlx/query-cd2c5f37e6caabc0789ca458e8bc52f23b750c3555030c5c15c40a5a13226b8f.json b/.sqlx/query-cd2c5f37e6caabc0789ca458e8bc52f23b750c3555030c5c15c40a5a13226b8f.json new file mode 100644 index 0000000..54c768d --- /dev/null +++ b/.sqlx/query-cd2c5f37e6caabc0789ca458e8bc52f23b750c3555030c5c15c40a5a13226b8f.json @@ -0,0 +1,25 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT EXISTS (\n SELECT 1\n FROM principal_key_permissions\n WHERE signing_key_id = $1\n AND destination = $2\n AND revoked_at IS NULL\n AND not_before <= now()\n AND not_after > now()\n AND not_before <= $3\n AND not_after >= $4\n ) AS \"exists!\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "exists!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Timestamptz", + "Timestamptz" + ] + }, + "nullable": [ + null + ] + }, + "hash": "cd2c5f37e6caabc0789ca458e8bc52f23b750c3555030c5c15c40a5a13226b8f" +} diff --git a/.sqlx/query-e3b8aebb3bd68bb84e6d09cb0b214c5ba1e25a847aea0e877f2a6682a760efa6.json b/.sqlx/query-e3b8aebb3bd68bb84e6d09cb0b214c5ba1e25a847aea0e877f2a6682a760efa6.json new file mode 100644 index 0000000..c4f6a9e --- /dev/null +++ b/.sqlx/query-e3b8aebb3bd68bb84e6d09cb0b214c5ba1e25a847aea0e877f2a6682a760efa6.json @@ -0,0 +1,102 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n p.permission_id,\n p.subject_identity,\n p.subject_public_key_spki_der,\n p.destination,\n p.signing_key_id,\n p.not_before AS \"permission_not_before!\",\n p.not_after AS \"permission_not_after!\",\n p.signature,\n s.algorithm AS \"signer_algorithm!\",\n s.public_key_spki_der AS \"signer_public_key_spki_der!\",\n s.not_before AS \"signer_not_before!\",\n s.not_after AS \"signer_not_after!\",\n s.revoked_at AS signer_revoked_at,\n (\n s.revoked_at IS NULL\n AND s.not_before <= now()\n AND s.not_after > now()\n ) AS \"signer_active_now!\"\n FROM permission_registry p\n JOIN principal_signing_keys s ON s.key_id = p.signing_key_id\n WHERE p.subject_identity = $1\n AND p.destination = $2\n AND p.subject_public_key_spki_der = $3\n AND p.revoked_at IS NULL\n AND p.not_before <= now()\n AND p.not_after > now()\n ORDER BY p.not_after DESC\n LIMIT 16\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "permission_id", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "subject_identity", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "subject_public_key_spki_der", + "type_info": "Bytea" + }, + { + "ordinal": 3, + "name": "destination", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "signing_key_id", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "permission_not_before!", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "permission_not_after!", + "type_info": "Timestamptz" + }, + { + "ordinal": 7, + "name": "signature", + "type_info": "Bytea" + }, + { + "ordinal": 8, + "name": "signer_algorithm!", + "type_info": "Text" + }, + { + "ordinal": 9, + "name": "signer_public_key_spki_der!", + "type_info": "Bytea" + }, + { + "ordinal": 10, + "name": "signer_not_before!", + "type_info": "Timestamptz" + }, + { + "ordinal": 11, + "name": "signer_not_after!", + "type_info": "Timestamptz" + }, + { + "ordinal": 12, + "name": "signer_revoked_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 13, + "name": "signer_active_now!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Bytea" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + true, + null + ] + }, + "hash": "e3b8aebb3bd68bb84e6d09cb0b214c5ba1e25a847aea0e877f2a6682a760efa6" +} diff --git a/.sqlx/query-e4321d59e96d83a538a2a346dd68114a279557993f7757427cde78110cbef914.json b/.sqlx/query-e4321d59e96d83a538a2a346dd68114a279557993f7757427cde78110cbef914.json new file mode 100644 index 0000000..d7ab5b5 --- /dev/null +++ b/.sqlx/query-e4321d59e96d83a538a2a346dd68114a279557993f7757427cde78110cbef914.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM principal_signing_keys WHERE key_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "e4321d59e96d83a538a2a346dd68114a279557993f7757427cde78110cbef914" +} diff --git a/.sqlx/query-f11acb3b0cda9d25dc5c2e17886b2dbfeea4294861273777f0a725658caaea05.json b/.sqlx/query-f11acb3b0cda9d25dc5c2e17886b2dbfeea4294861273777f0a725658caaea05.json new file mode 100644 index 0000000..edf330f --- /dev/null +++ b/.sqlx/query-f11acb3b0cda9d25dc5c2e17886b2dbfeea4294861273777f0a725658caaea05.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE permission_registry SET destination = $2 WHERE permission_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "f11acb3b0cda9d25dc5c2e17886b2dbfeea4294861273777f0a725658caaea05" +} diff --git a/.sqlx/query-f2680b46be548e799c733e3fadbcb8c9c25750ab7d273575d7b0923f1fa503db.json b/.sqlx/query-f2680b46be548e799c733e3fadbcb8c9c25750ab7d273575d7b0923f1fa503db.json new file mode 100644 index 0000000..366d649 --- /dev/null +++ b/.sqlx/query-f2680b46be548e799c733e3fadbcb8c9c25750ab7d273575d7b0923f1fa503db.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT version FROM agent_gateway_schema_version ORDER BY version DESC LIMIT 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "version", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "f2680b46be548e799c733e3fadbcb8c9c25750ab7d273575d7b0923f1fa503db" +} diff --git a/.sqlx/query-f37016d64612f7683b6c1be89eb172cf9fd9f432cd602e970ab33480e40c1320.json b/.sqlx/query-f37016d64612f7683b6c1be89eb172cf9fd9f432cd602e970ab33480e40c1320.json new file mode 100644 index 0000000..105c213 --- /dev/null +++ b/.sqlx/query-f37016d64612f7683b6c1be89eb172cf9fd9f432cd602e970ab33480e40c1320.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE principal_signing_keys SET revoked_at = now() WHERE key_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "f37016d64612f7683b6c1be89eb172cf9fd9f432cd602e970ab33480e40c1320" +} diff --git a/Cargo.toml b/Cargo.toml index 0c0f29f..ae36012 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,9 @@ name = "agent_gateway" version = "0.1.0" edition = "2024" +[lints.clippy] +pedantic = "deny" + [dependencies] tokio = { version = "1", features = ["io-util", "macros", "net", "rt-multi-thread", "signal", "time"] } rustls = { version = "0.23", default-features = false, features = ["aws-lc-rs", "std", "tls12"] } @@ -26,7 +29,7 @@ opentelemetry = "0.31" opentelemetry_sdk = { version = "0.31", default-features = false, features = ["rt-tokio", "trace"] } opentelemetry-otlp = { version = "0.31", default-features = false, features = ["grpc-tonic", "trace"] } anyhow = "1" -sqlx = { version = "0.8.6", default-features = false, features = ["runtime-tokio-rustls", "postgres", "chrono", "derive"] } +sqlx = { version = "0.8.6", default-features = false, features = ["runtime-tokio-rustls", "postgres", "chrono", "macros"] } chrono = "0.4.44" p256 = { version = "0.13.2", features = ["ecdsa", "pkcs8"] } diff --git a/README.md b/README.md index 8903805..1efd7e3 100644 --- a/README.md +++ b/README.md @@ -151,3 +151,15 @@ cargo test ``` Database-backed policy and e2e tests require `TEST_DATABASE_URL` to point at a Postgres database that the test process can migrate and write to. The tests cover policy evaluation, signed-permission verification, signer delegation scope enforcement, destination normalization, config validation, TLS PKI generation, and proxy request parsing. + +## SQLx Query Metadata + +Checked SQLx query macros compile against committed `.sqlx` metadata by default, so normal builds do not need database access. The metadata is derived from the migrations; it does not replace the migration SQL used to create or update the database schema. + +Regenerate the metadata after changing migrations or SQL query text: + +```bash +cargo install sqlx-cli --version 0.8.6 --locked --no-default-features --features postgres +SQLX_OFFLINE=false DATABASE_URL="$TEST_DATABASE_URL" cargo sqlx database setup +SQLX_OFFLINE=false DATABASE_URL="$TEST_DATABASE_URL" cargo sqlx prepare -- --all-targets --locked +``` diff --git a/src/config.rs b/src/config.rs index 9a65a24..28d4936 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,6 +41,12 @@ pub struct PolicyConfig { } impl Config { + /// Load, parse, and validate a TOML config file. + /// + /// # Errors + /// + /// Returns an error if the file cannot be read, the TOML cannot be parsed, + /// or any config value fails validation. pub fn load(path: &Path) -> anyhow::Result { let contents = std::fs::read_to_string(path)?; let config: Config = toml::from_str(&contents)?; @@ -61,6 +67,12 @@ impl Config { } impl PolicyConfig { + /// Resolve the configured database URL from the literal value or env var. + /// + /// # Errors + /// + /// Returns an error if no source is configured, both sources are configured, + /// the configured source is empty, or the environment variable cannot be read. pub fn database_url(&self) -> anyhow::Result { match (&self.database_url, &self.database_url_env) { (Some(url), None) => { @@ -89,18 +101,22 @@ impl PolicyConfig { } } + #[must_use] pub fn max_connections(&self) -> u32 { self.max_connections.unwrap_or(5) } + #[must_use] pub fn connect_timeout(&self) -> Duration { Duration::from_millis(self.connect_timeout_ms.unwrap_or(5_000)) } + #[must_use] pub fn pool_acquire_timeout(&self) -> Duration { Duration::from_millis(self.pool_acquire_timeout_ms.unwrap_or(1_000)) } + #[must_use] pub fn query_timeout(&self) -> Duration { Duration::from_millis(self.query_timeout_ms.unwrap_or(500)) } @@ -119,13 +135,13 @@ impl PolicyConfig { match (&self.database_url, &self.database_url_env) { (Some(url), None) => { - anyhow::ensure!(!url.is_empty(), "policy.database_url must not be empty") + anyhow::ensure!(!url.is_empty(), "policy.database_url must not be empty"); } (None, Some(env_name)) => { anyhow::ensure!( !env_name.is_empty(), "policy.database_url_env must not be empty" - ) + ); } (None, None) => { anyhow::bail!("policy.database_url or policy.database_url_env is required") diff --git a/src/observability.rs b/src/observability.rs index 0767ae7..96e9ad5 100644 --- a/src/observability.rs +++ b/src/observability.rs @@ -12,6 +12,12 @@ use crate::config::ObservabilityConfig; static TRACER_PROVIDER: OnceLock = OnceLock::new(); +/// Initialize tracing and optional OpenTelemetry export. +/// +/// # Errors +/// +/// Returns an error if the OTLP exporter cannot be built or the global tracing +/// subscriber cannot be initialized. pub fn init(config: &ObservabilityConfig) -> anyhow::Result<()> { global::set_text_map_propagator(TraceContextPropagator::new()); @@ -55,14 +61,12 @@ pub fn init(config: &ObservabilityConfig) -> anyhow::Result<()> { } fn stdout_logging_enabled() -> bool { - std::env::var("AGENT_GATEWAY_LOG_STDOUT") - .map(|value| { - !matches!( - value.trim().to_ascii_lowercase().as_str(), - "0" | "false" | "off" | "no" - ) - }) - .unwrap_or(true) + std::env::var("AGENT_GATEWAY_LOG_STDOUT").map_or(true, |value| { + !matches!( + value.trim().to_ascii_lowercase().as_str(), + "0" | "false" | "off" | "no" + ) + }) } pub fn shutdown() { diff --git a/src/policy.rs b/src/policy.rs index 00d1a27..08cf163 100644 --- a/src/policy.rs +++ b/src/policy.rs @@ -50,6 +50,13 @@ impl PostgresPolicyEngine { } } +/// Build the configured policy engine. +/// +/// # Errors +/// +/// Returns an error if the database URL is invalid, the database cannot be +/// reached, the schema version does not match, or the client extension OID is +/// invalid. pub async fn build_engine(config: &PolicyConfig) -> anyhow::Result> { let db_pool = build_pg_pool(config).await?; RegistryStore::verify_schema_version(&db_pool).await?; @@ -284,7 +291,7 @@ fn certificate_subject( return Err("extension value contains trailing bytes".into()); } Ok(CertificateSubject { - identity: v.string().to_owned(), + identity: v.string().clone(), public_key_spki_der, }) } diff --git a/src/proxy.rs b/src/proxy.rs index 61e8829..ef3beba 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -32,7 +32,7 @@ impl Extractor for HeaderExtractor<'_> { } fn keys(&self) -> Vec<&str> { - self.0.keys().map(|name| name.as_str()).collect() + self.0.keys().map(http::HeaderName::as_str).collect() } } @@ -96,29 +96,14 @@ impl ProxyService { source_identity } PolicyDecision::Deny { - source_identity: Some(source_identity), + source_identity, reason, } => { - warn!( - source_identity = %source_identity, - source_peer_addr = %self.source_peer_addr, - dest_authority = %dest.authority, - policy_decision = "deny", - deny_reason = %reason, - "CONNECT denied" - ); - return response(StatusCode::FORBIDDEN, "forbidden"); - } - PolicyDecision::Deny { - source_identity: None, - reason, - } => { - warn!( - source_peer_addr = %self.source_peer_addr, - dest_authority = %dest.authority, - policy_decision = "deny", - deny_reason = %reason, - "CONNECT denied" + log_denial( + source_identity.as_deref(), + self.source_peer_addr, + &dest.authority, + &reason, ); return response(StatusCode::FORBIDDEN, "forbidden"); } @@ -126,7 +111,7 @@ impl ProxyService { // Connect to destination BEFORE returning 200 so the client knows // the tunnel is actually established. - let mut upstream = match TcpStream::connect((&*dest.host, dest.port)).await { + let upstream = match TcpStream::connect((&*dest.host, dest.port)).await { Ok(s) => s, Err(e) => { error!( @@ -141,50 +126,12 @@ impl ProxyService { }; let on_upgrade = hyper::upgrade::on(req); - let source_peer_addr = self.source_peer_addr; - - let tunnel_span = tracing::Span::current(); - tokio::spawn( - async move { - let upgraded = match on_upgrade.await { - Ok(u) => u, - Err(e) => { - warn!( - source_identity = %source_identity, - source_peer_addr = %source_peer_addr, - dest_authority = %dest.authority, - error = %e, - "upgrade failed" - ); - return; - } - }; - - let mut downstream = hyper_util::rt::TokioIo::new(upgraded); - - match copy_bidirectional(&mut downstream, &mut upstream).await { - Ok((up, down)) => { - info!( - source_identity = %source_identity, - source_peer_addr = %source_peer_addr, - dest_authority = %dest.authority, - bytes_client_to_dest = up, - bytes_dest_to_client = down, - "tunnel closed" - ); - } - Err(e) => { - error!( - source_identity = %source_identity, - source_peer_addr = %source_peer_addr, - dest_authority = %dest.authority, - error = %e, - "tunnel error" - ); - } - } - } - .instrument(tunnel_span), + spawn_tunnel( + on_upgrade, + upstream, + source_identity, + self.source_peer_addr, + dest.authority, ); response(StatusCode::OK, "") @@ -214,6 +161,7 @@ impl MakeProxyService { Self { policy_engine } } + #[must_use] pub fn make_service( &self, peer_certs: Vec>, @@ -223,12 +171,91 @@ impl MakeProxyService { } } +#[must_use] pub fn extract_peer_certs(conn: &ServerConnection) -> Vec> { conn.peer_certificates() - .map(|certs| certs.to_vec()) + .map(<[CertificateDer<'_>]>::to_vec) .unwrap_or_default() } +fn log_denial( + source_identity: Option<&str>, + source_peer_addr: SocketAddr, + dest_authority: &str, + reason: &str, +) { + if let Some(source_identity) = source_identity { + warn!( + source_identity = %source_identity, + source_peer_addr = %source_peer_addr, + dest_authority = %dest_authority, + policy_decision = "deny", + deny_reason = %reason, + "CONNECT denied" + ); + } else { + warn!( + source_peer_addr = %source_peer_addr, + dest_authority = %dest_authority, + policy_decision = "deny", + deny_reason = %reason, + "CONNECT denied" + ); + } +} + +fn spawn_tunnel( + on_upgrade: hyper::upgrade::OnUpgrade, + mut upstream: TcpStream, + source_identity: String, + source_peer_addr: SocketAddr, + dest_authority: String, +) { + let tunnel_span = tracing::Span::current(); + tokio::spawn( + async move { + let upgraded = match on_upgrade.await { + Ok(u) => u, + Err(e) => { + warn!( + source_identity = %source_identity, + source_peer_addr = %source_peer_addr, + dest_authority = %dest_authority, + error = %e, + "upgrade failed" + ); + return; + } + }; + + let mut downstream = hyper_util::rt::TokioIo::new(upgraded); + + match copy_bidirectional(&mut downstream, &mut upstream).await { + Ok((up, down)) => { + info!( + source_identity = %source_identity, + source_peer_addr = %source_peer_addr, + dest_authority = %dest_authority, + bytes_client_to_dest = up, + bytes_dest_to_client = down, + "tunnel closed" + ); + } + Err(e) => { + error!( + source_identity = %source_identity, + source_peer_addr = %source_peer_addr, + dest_authority = %dest_authority, + error = %e, + "tunnel error" + ); + } + } + } + .instrument(tunnel_span), + ); +} + fn response(status: StatusCode, message: &str) -> Response { let body: ProxyBody = if message.is_empty() { Empty::::new().boxed() diff --git a/src/registry.rs b/src/registry.rs index 866cc83..41d4441 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -12,7 +12,7 @@ pub(crate) struct RegistryStore { query_timeout: Duration, } -#[derive(Debug, Clone, sqlx::FromRow)] +#[derive(Debug, Clone)] pub(crate) struct CandidatePermission { pub(crate) permission_id: String, pub(crate) subject_identity: String, @@ -39,7 +39,7 @@ impl RegistryStore { } pub(crate) async fn verify_schema_version(pool: &PgPool) -> anyhow::Result<()> { - let version = sqlx::query_scalar::<_, i32>( + let version = sqlx::query_scalar!( "SELECT version FROM agent_gateway_schema_version ORDER BY version DESC LIMIT 1", ) .fetch_one(pool) @@ -59,7 +59,8 @@ impl RegistryStore { destination: &str, subject_public_key_spki_der: &[u8], ) -> anyhow::Result> { - let query = sqlx::query_as::<_, CandidatePermission>( + let query = sqlx::query_as!( + CandidatePermission, r#" SELECT p.permission_id, @@ -67,19 +68,19 @@ impl RegistryStore { p.subject_public_key_spki_der, p.destination, p.signing_key_id, - p.not_before AS permission_not_before, - p.not_after AS permission_not_after, + p.not_before AS "permission_not_before!", + p.not_after AS "permission_not_after!", p.signature, - s.algorithm AS signer_algorithm, - s.public_key_spki_der AS signer_public_key_spki_der, - s.not_before AS signer_not_before, - s.not_after AS signer_not_after, + s.algorithm AS "signer_algorithm!", + s.public_key_spki_der AS "signer_public_key_spki_der!", + s.not_before AS "signer_not_before!", + s.not_after AS "signer_not_after!", s.revoked_at AS signer_revoked_at, ( s.revoked_at IS NULL AND s.not_before <= now() AND s.not_after > now() - ) AS signer_active_now + ) AS "signer_active_now!" FROM permission_registry p JOIN principal_signing_keys s ON s.key_id = p.signing_key_id WHERE p.subject_identity = $1 @@ -91,10 +92,10 @@ impl RegistryStore { ORDER BY p.not_after DESC LIMIT 16 "#, - ) - .bind(subject_identity) - .bind(destination) - .bind(subject_public_key_spki_der); + subject_identity, + destination, + subject_public_key_spki_der, + ); tokio::time::timeout(self.query_timeout, query.fetch_all(&self.pool)) .await @@ -109,7 +110,7 @@ impl RegistryStore { permission_not_before: DateTime, permission_not_after: DateTime, ) -> anyhow::Result { - let query = sqlx::query_scalar::<_, bool>( + let query = sqlx::query_scalar!( r#" SELECT EXISTS ( SELECT 1 @@ -121,13 +122,13 @@ impl RegistryStore { AND not_after > now() AND not_before <= $3 AND not_after >= $4 - ) + ) AS "exists!" "#, - ) - .bind(signing_key_id) - .bind(destination) - .bind(permission_not_before) - .bind(permission_not_after); + signing_key_id, + destination, + permission_not_before, + permission_not_after, + ); tokio::time::timeout(self.query_timeout, query.fetch_one(&self.pool)) .await diff --git a/src/tls.rs b/src/tls.rs index 3248f0b..22f6215 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -16,6 +16,12 @@ use x509_parser::prelude::*; use crate::config::ServerConfig as AppServerConfig; +/// Build the rustls server config used by the gateway listener. +/// +/// # Errors +/// +/// Returns an error if certificates or private keys cannot be read, parsed, or +/// accepted by rustls. pub fn build_server_config(config: &AppServerConfig) -> anyhow::Result> { let cert_chain = load_cert_chain(&config.tls_cert_path) .with_context(|| format!("loading TLS cert from {}", config.tls_cert_path.display()))?; @@ -31,6 +37,7 @@ pub fn build_server_config(config: &AppServerConfig) -> anyhow::Result Arc { Arc::new(DbRootedClientCertVerifier::new()) } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 879d742..4b33d8b 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -149,7 +149,7 @@ impl TestPki { } pub fn client_spki_der(&self) -> Vec { - certificate_spki_der(&self.client_cert.der().to_vec()) + certificate_spki_der(self.client_cert.der()) } } @@ -168,16 +168,19 @@ fn der_encode_utf8_string(value: &str) -> Vec { let mut encoded = Vec::with_capacity(2 + len); encoded.push(0x0c); // UTF8String tag if len < 128 { - encoded.push(len as u8); + encoded.push(u8::try_from(len).expect("short-form DER length fits in u8")); } else { let mut len_bytes = Vec::new(); let mut remaining = len; while remaining > 0 { - len_bytes.push((remaining & 0xff) as u8); + len_bytes + .push(u8::try_from(remaining & 0xff).expect("masked DER length byte fits in u8")); remaining >>= 8; } len_bytes.reverse(); - encoded.push(0x80 | (len_bytes.len() as u8)); + let len_byte_count = + u8::try_from(len_bytes.len()).expect("DER length-of-length fits in u8"); + encoded.push(0x80 | len_byte_count); encoded.extend_from_slice(&len_bytes); } encoded.extend_from_slice(value_bytes); @@ -323,10 +326,10 @@ pub async fn wait_for_event( /// Acquire an exclusive lock for e2e tests that share a global tracing subscriber. /// Hold the returned guard for the duration of the test. -static E2E_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(()); +static E2E_MUTEX: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(()); -pub fn serial_test_lock() -> std::sync::MutexGuard<'static, ()> { - E2E_MUTEX.lock().unwrap_or_else(|e| e.into_inner()) +pub async fn serial_test_lock() -> tokio::sync::MutexGuard<'static, ()> { + E2E_MUTEX.lock().await } pub fn install_test_crypto_provider() { @@ -379,19 +382,19 @@ impl TestAuthzRegistry { let key_id = unique_id("test-key"); let (not_before, not_after) = active_window(); - sqlx::query( - r#" + sqlx::query!( + r" INSERT INTO principal_signing_keys ( key_id, algorithm, public_key_spki_der, not_before, not_after ) VALUES ($1, 'ecdsa_p256_sha256', $2, $3, $4) - "#, + ", + &key_id, + public_key_spki_der.as_slice(), + not_before, + not_after, ) - .bind(&key_id) - .bind(&public_key_spki_der) - .bind(not_before) - .bind(not_after) .execute(&pool) .await .expect("insert test signing key"); @@ -421,21 +424,27 @@ impl TestAuthzRegistry { } pub async fn cleanup(&self) { - sqlx::query("DELETE FROM permission_registry WHERE signing_key_id = $1") - .bind(&self.key_id) - .execute(&self.pool) - .await - .expect("delete test permissions"); - sqlx::query("DELETE FROM principal_key_permissions WHERE signing_key_id = $1") - .bind(&self.key_id) - .execute(&self.pool) - .await - .expect("delete test signer scopes"); - sqlx::query("DELETE FROM principal_signing_keys WHERE key_id = $1") - .bind(&self.key_id) - .execute(&self.pool) - .await - .expect("delete test signing key"); + sqlx::query!( + "DELETE FROM permission_registry WHERE signing_key_id = $1", + &self.key_id + ) + .execute(&self.pool) + .await + .expect("delete test permissions"); + sqlx::query!( + "DELETE FROM principal_key_permissions WHERE signing_key_id = $1", + &self.key_id + ) + .execute(&self.pool) + .await + .expect("delete test signer scopes"); + sqlx::query!( + "DELETE FROM principal_signing_keys WHERE key_id = $1", + &self.key_id + ) + .execute(&self.pool) + .await + .expect("delete test signing key"); } pub async fn allow( @@ -491,28 +500,34 @@ impl TestAuthzRegistry { } pub async fn revoke_permission(&self, permission_id: &str) { - sqlx::query("UPDATE permission_registry SET revoked_at = now() WHERE permission_id = $1") - .bind(permission_id) - .execute(&self.pool) - .await - .expect("revoke permission"); + sqlx::query!( + "UPDATE permission_registry SET revoked_at = now() WHERE permission_id = $1", + permission_id + ) + .execute(&self.pool) + .await + .expect("revoke permission"); } pub async fn revoke_signer(&self) { - sqlx::query("UPDATE principal_signing_keys SET revoked_at = now() WHERE key_id = $1") - .bind(&self.key_id) - .execute(&self.pool) - .await - .expect("revoke signer"); + sqlx::query!( + "UPDATE principal_signing_keys SET revoked_at = now() WHERE key_id = $1", + &self.key_id + ) + .execute(&self.pool) + .await + .expect("revoke signer"); } pub async fn tamper_permission_destination(&self, permission_id: &str, destination: &str) { - sqlx::query("UPDATE permission_registry SET destination = $2 WHERE permission_id = $1") - .bind(permission_id) - .bind(destination) - .execute(&self.pool) - .await - .expect("tamper permission destination"); + sqlx::query!( + "UPDATE permission_registry SET destination = $2 WHERE permission_id = $1", + permission_id, + destination + ) + .execute(&self.pool) + .await + .expect("tamper permission destination"); } async fn allow_inner( @@ -526,18 +541,18 @@ impl TestAuthzRegistry { let (not_before, not_after) = active_window(); if include_scope { - sqlx::query( - r#" + sqlx::query!( + r" INSERT INTO principal_key_permissions ( signing_key_id, destination, not_before, not_after ) VALUES ($1, $2, $3, $4) - "#, + ", + &self.key_id, + destination, + not_before, + not_after, ) - .bind(&self.key_id) - .bind(destination) - .bind(not_before) - .bind(not_after) .execute(&self.pool) .await .expect("insert signer scope"); @@ -553,24 +568,25 @@ impl TestAuthzRegistry { not_after, ); let signature: p256::ecdsa::Signature = self.signing_key.sign(&signed_bytes); + let signature_der = signature.to_der(); - sqlx::query( - r#" + sqlx::query!( + r" INSERT INTO permission_registry ( permission_id, signing_key_id, subject_identity, subject_public_key_spki_der, destination, not_before, not_after, signature ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - "#, + ", + &permission_id, + &self.key_id, + subject_identity, + subject_public_key_spki_der, + destination, + not_before, + not_after, + signature_der.as_bytes(), ) - .bind(&permission_id) - .bind(&self.key_id) - .bind(subject_identity) - .bind(subject_public_key_spki_der) - .bind(destination) - .bind(not_before) - .bind(not_after) - .bind(signature.to_der().as_bytes()) .execute(&self.pool) .await .expect("insert signed permission"); @@ -702,9 +718,8 @@ pub async fn start_proxy( let task = tokio::spawn(async move { loop { - let (tcp, peer) = match listener.accept().await { - Ok(c) => c, - Err(_) => continue, + let Ok((tcp, peer)) = listener.accept().await else { + continue; }; let acceptor = tls_acceptor.clone(); let svc = make_service.clone(); @@ -736,7 +751,7 @@ pub async fn start_proxy( (addr, ServerGuard { task }) } -/// Connect an HTTP/2 mTLS client to the proxy. Returns a SendRequest handle. +/// Connect an HTTP/2 mTLS client to the proxy. Returns a `SendRequest` handle. pub async fn connect_client( proxy_addr: SocketAddr, pki: &TestPki, diff --git a/tests/e2e.rs b/tests/e2e.rs index caf5c11..46dca67 100644 --- a/tests/e2e.rs +++ b/tests/e2e.rs @@ -35,7 +35,7 @@ fn assert_field_eq(event: &common::CapturedEvent, field: &str, expected: &str) { fn assert_field_present(event: &common::CapturedEvent, field: &str) { assert!( - event.fields.get(field).is_some(), + event.fields.contains_key(field), "{field} should be present" ); } @@ -72,7 +72,7 @@ impl ResolvesClientCert for MismatchedClientCertResolver { #[tokio::test] async fn tunnel_echoes_data() { - let _guard = serial_test_lock(); + let _guard = serial_test_lock().await; let log = init_tracing_capture(); drain_events(&log); @@ -140,7 +140,7 @@ async fn tunnel_echoes_data() { #[tokio::test] async fn tunnel_policy_deny() { - let _guard = serial_test_lock(); + let _guard = serial_test_lock().await; let log = init_tracing_capture(); drain_events(&log); @@ -181,7 +181,7 @@ async fn tunnel_policy_deny() { #[tokio::test] async fn tunnel_unreachable_destination() { - let _guard = serial_test_lock(); + let _guard = serial_test_lock().await; let log = init_tracing_capture(); drain_events(&log); @@ -218,7 +218,7 @@ async fn tunnel_unreachable_destination() { #[tokio::test] async fn non_connect_method_rejected() { - let _guard = serial_test_lock(); + let _guard = serial_test_lock().await; let log = init_tracing_capture(); drain_events(&log); @@ -260,7 +260,7 @@ async fn non_connect_method_rejected() { #[tokio::test] async fn tunnel_wrong_extension_denied() { - let _guard = serial_test_lock(); + let _guard = serial_test_lock().await; let log = init_tracing_capture(); drain_events(&log); @@ -311,7 +311,7 @@ async fn tunnel_wrong_extension_denied() { #[tokio::test] async fn tunnel_missing_extension_denied() { - let _guard = serial_test_lock(); + let _guard = serial_test_lock().await; let log = init_tracing_capture(); drain_events(&log); @@ -375,7 +375,7 @@ async fn tunnel_missing_extension_denied() { #[tokio::test] async fn mtls_no_client_cert_rejected() { - let _guard = serial_test_lock(); + let _guard = serial_test_lock().await; let log = init_tracing_capture(); drain_events(&log); @@ -422,7 +422,7 @@ async fn mtls_no_client_cert_rejected() { #[tokio::test] async fn mtls_mismatched_cert_and_private_key_rejected() { - let _guard = serial_test_lock(); + let _guard = serial_test_lock().await; let log = init_tracing_capture(); drain_events(&log); @@ -470,7 +470,7 @@ async fn mtls_mismatched_cert_and_private_key_rejected() { #[tokio::test] async fn mtls_accepts_untrusted_ca_but_policy_denies_unregistered_key() { - let _guard = serial_test_lock(); + let _guard = serial_test_lock().await; let log = init_tracing_capture(); drain_events(&log); diff --git a/tests/integration.rs b/tests/integration.rs index 8f5aae0..d47c0c1 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -25,7 +25,7 @@ fn assert_allow(decision: PolicyDecision) { } } -fn assert_deny(decision: PolicyDecision) { +fn assert_deny(decision: &PolicyDecision) { if let PolicyDecision::Allow { .. } = decision { panic!("expected Deny, got Allow"); } @@ -86,7 +86,7 @@ async fn policy_denies_wrong_port() { .allow_for_pki(&pki, &subject, "api.example.com:443") .await; let engine = registry.engine(EXT_OID).await; - assert_deny(eval(engine.as_ref(), &pki, "api.example.com:8080").await); + assert_deny(&eval(engine.as_ref(), &pki, "api.example.com:8080").await); registry.cleanup().await; } @@ -182,7 +182,7 @@ async fn policy_config_without_port_denies_non_443() { .allow_for_pki(&pki, &subject, "api.example.com:443") .await; let engine = registry.engine(EXT_OID).await; - assert_deny(eval(engine.as_ref(), &pki, "api.example.com:8080").await); + assert_deny(&eval(engine.as_ref(), &pki, "api.example.com:8080").await); registry.cleanup().await; } @@ -196,7 +196,7 @@ async fn policy_ipv6_config_matches_bracketed_request() { registry.allow_for_pki(&pki, &subject, "[::1]:8443").await; let engine = registry.engine(EXT_OID).await; assert_allow(eval(engine.as_ref(), &pki, "[::1]:8443").await); - assert_deny(eval(engine.as_ref(), &pki, "[::1]:443").await); + assert_deny(&eval(engine.as_ref(), &pki, "[::1]:443").await); registry.cleanup().await; } @@ -208,7 +208,7 @@ async fn policy_bare_ipv6_config_matches_bracketed_request() { registry.allow_for_pki(&pki, &subject, "[::1]:443").await; let engine = registry.engine(EXT_OID).await; assert_allow(eval(engine.as_ref(), &pki, "::1").await); - assert_deny(eval(engine.as_ref(), &pki, "[::1]:8080").await); + assert_deny(&eval(engine.as_ref(), &pki, "[::1]:8080").await); registry.cleanup().await; } @@ -239,7 +239,7 @@ async fn policy_denies_tampered_permission_destination() { .tamper_permission_destination(&permission.permission_id, "evil.example.com:443") .await; let engine = registry.engine(EXT_OID).await; - assert_deny(eval(engine.as_ref(), &pki, "evil.example.com:443").await); + assert_deny(&eval(engine.as_ref(), &pki, "evil.example.com:443").await); registry.cleanup().await; } @@ -304,7 +304,7 @@ async fn policy_denies_signer_scope_violation() { .allow_without_signer_scope_for_pki(&pki, &subject, "api.example.com:443") .await; let engine = registry.engine(EXT_OID).await; - assert_deny(eval(engine.as_ref(), &pki, "api.example.com:443").await); + assert_deny(&eval(engine.as_ref(), &pki, "api.example.com:443").await); registry.cleanup().await; } @@ -538,6 +538,6 @@ async fn proxy_dest_ipv6_matches_policy() { assert_allow(eval(engine.as_ref(), &pki, "[::1]:8443").await); - assert_deny(eval(engine.as_ref(), &pki, "[::1]:443").await); + assert_deny(&eval(engine.as_ref(), &pki, "[::1]:443").await); registry.cleanup().await; }