From 78d265f692f1051b8d0d8abff63b6e5bbdaf4719 Mon Sep 17 00:00:00 2001 From: druidliu Date: Mon, 10 Apr 2023 17:24:33 +0800 Subject: [PATCH 1/6] [fix][broker] Fix tenant admin authorization bug. --- .../PulsarAuthorizationProvider.java | 95 ++++++++++++------- .../pulsar/broker/auth/AuthorizationTest.java | 16 ++-- 2 files changed, 69 insertions(+), 42 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 3f6d38194713e..1913f874f571b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -84,7 +84,18 @@ public void initialize(ServiceConfiguration conf, PulsarResources pulsarResource @Override public CompletableFuture canProduceAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData) { - return checkAuthorization(topicName, role, AuthAction.produce); + return validateTenantAdminAccess(topicName.getTenant(), role, authenticationData) + .thenComposeAsync(isSuperUserOrAdmin -> { + if (log.isDebugEnabled()) { + log.debug("Verify if role {} is allowed to consume topic {}: isSuperUserOrAdmin={}", + role, topicName, isSuperUserOrAdmin); + } + if (isSuperUserOrAdmin) { + return CompletableFuture.completedFuture(true); + } else { + return checkAuthorization(topicName, role, AuthAction.produce); + } + }); } /** @@ -101,43 +112,57 @@ public CompletableFuture canProduceAsync(TopicName topicName, String ro @Override public CompletableFuture canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData, String subscription) { - return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) - .thenCompose(policies -> { - if (!policies.isPresent()) { - if (log.isDebugEnabled()) { - log.debug("Policies node couldn't be found for topic : {}", topicName); - } - } else { - if (isNotBlank(subscription)) { - // validate if role is authorized to access subscription. (skip validation if authorization - // list is empty) - Set roles = policies.get().auth_policies - .getSubscriptionAuthentication().get(subscription); - if (roles != null && !roles.isEmpty() && !roles.contains(role)) { - log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription); - return CompletableFuture.completedFuture(false); - } - - // validate if subscription-auth mode is configured - if (policies.get().subscription_auth_mode != null) { - switch (policies.get().subscription_auth_mode) { - case Prefix: - if (!subscription.startsWith(role)) { - PulsarServerException ex = new PulsarServerException(String.format( - "Failed to create consumer - The subscription name needs to be" - + " prefixed by the authentication role, like %s-xxxx for topic: %s", - role, topicName)); - return FutureUtil.failedFuture(ex); + return validateTenantAdminAccess(topicName.getTenant(), role, authenticationData).exceptionally(ex -> { + log.warn("Client with Role - {} failed to check tenant admin for topic - {}. {}", role, topicName, + ex.getMessage()); + return false; + }).thenComposeAsync(isSuperUserOrAdmin -> { + if (log.isDebugEnabled()) { + log.debug("Verify if role {} is allowed to consume topic {}: isSuperUserOrAdmin={}", + role, topicName, isSuperUserOrAdmin); + } + if (isSuperUserOrAdmin) { + return CompletableFuture.completedFuture(true); + } else { + return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) + .thenCompose(policies -> { + if (!policies.isPresent()) { + if (log.isDebugEnabled()) { + log.debug("Policies node couldn't be found for topic : {}", topicName); + } + } else { + if (isNotBlank(subscription)) { + // validate if role is authorized to access subscription. (skip validation if authorization + // list is empty) + Set roles = policies.get().auth_policies + .getSubscriptionAuthentication().get(subscription); + if (roles != null && !roles.isEmpty() && !roles.contains(role)) { + log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription); + return CompletableFuture.completedFuture(false); + } + + // validate if subscription-auth mode is configured + if (policies.get().subscription_auth_mode != null) { + switch (policies.get().subscription_auth_mode) { + case Prefix: + if (!subscription.startsWith(role)) { + PulsarServerException ex = new PulsarServerException(String.format( + "Failed to create consumer - The subscription name needs to be" + + " prefixed by the authentication role, like %s-xxxx for topic: %s", + role, topicName)); + return FutureUtil.failedFuture(ex); + } + break; + default: + break; } - break; - default: - break; + } } } - } - } - return checkAuthorization(topicName, role, AuthAction.consume); - }); + return checkAuthorization(topicName, role, AuthAction.consume); + }); + } + }); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index 58cf4ee418ea4..9869dd7cda587 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -82,7 +82,8 @@ public void simple() throws Exception { assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); admin.clusters().createCluster("c1", ClusterData.builder().build()); - admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1"))); + String tenantAdmin = "role1"; + admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); waitForChange(); @@ -215,21 +216,22 @@ public void simple() throws Exception { SubscriptionAuthMode.Prefix); waitForChange(); - assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null)); + assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null)); - try { - assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "sub1")); - fail(); - } catch (Exception ignored) {} + // tenant admin can consume all topics, even if SubscriptionAuthMode.Prefix mode + assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1")); try { assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "sub2")); fail(); } catch (Exception ignored) {} - assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "role1-sub1")); + assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "role1-sub1")); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2")); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "pulsar.super_user", null, "role3-sub1")); + // tenant admin can produce all topics + assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); + admin.namespaces().deleteNamespace("p1/c1/ns1"); admin.tenants().deleteTenant("p1"); admin.clusters().deleteCluster("c1"); From 65c7c4088f035b1c2578e4a21eb9d470be65ce3a Mon Sep 17 00:00:00 2001 From: druidliu Date: Wed, 12 Apr 2023 11:43:22 +0800 Subject: [PATCH 2/6] Optimize. --- .../PulsarAuthorizationProvider.java | 102 +++++++++--------- .../pulsar/broker/auth/AuthorizationTest.java | 6 +- 2 files changed, 56 insertions(+), 52 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 1913f874f571b..5beefdaf3d912 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -85,9 +85,9 @@ public void initialize(ServiceConfiguration conf, PulsarResources pulsarResource public CompletableFuture canProduceAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData) { return validateTenantAdminAccess(topicName.getTenant(), role, authenticationData) - .thenComposeAsync(isSuperUserOrAdmin -> { + .thenCompose(isSuperUserOrAdmin -> { if (log.isDebugEnabled()) { - log.debug("Verify if role {} is allowed to consume topic {}: isSuperUserOrAdmin={}", + log.debug("Verify if role {} is allowed to produce topic {}: isSuperUserOrAdmin={}", role, topicName, isSuperUserOrAdmin); } if (isSuperUserOrAdmin) { @@ -112,57 +112,59 @@ public CompletableFuture canProduceAsync(TopicName topicName, String ro @Override public CompletableFuture canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData, String subscription) { - return validateTenantAdminAccess(topicName.getTenant(), role, authenticationData).exceptionally(ex -> { - log.warn("Client with Role - {} failed to check tenant admin for topic - {}. {}", role, topicName, - ex.getMessage()); - return false; - }).thenComposeAsync(isSuperUserOrAdmin -> { - if (log.isDebugEnabled()) { - log.debug("Verify if role {} is allowed to consume topic {}: isSuperUserOrAdmin={}", - role, topicName, isSuperUserOrAdmin); - } - if (isSuperUserOrAdmin) { - return CompletableFuture.completedFuture(true); - } else { - return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) - .thenCompose(policies -> { - if (!policies.isPresent()) { - if (log.isDebugEnabled()) { - log.debug("Policies node couldn't be found for topic : {}", topicName); - } - } else { - if (isNotBlank(subscription)) { - // validate if role is authorized to access subscription. (skip validation if authorization - // list is empty) - Set roles = policies.get().auth_policies - .getSubscriptionAuthentication().get(subscription); - if (roles != null && !roles.isEmpty() && !roles.contains(role)) { - log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription); - return CompletableFuture.completedFuture(false); - } - - // validate if subscription-auth mode is configured - if (policies.get().subscription_auth_mode != null) { - switch (policies.get().subscription_auth_mode) { - case Prefix: - if (!subscription.startsWith(role)) { - PulsarServerException ex = new PulsarServerException(String.format( - "Failed to create consumer - The subscription name needs to be" - + " prefixed by the authentication role, like %s-xxxx for topic: %s", - role, topicName)); - return FutureUtil.failedFuture(ex); + return validateTenantAdminAccess(topicName.getTenant(), role, authenticationData) + .thenCompose(isSuperUserOrAdmin -> { + if (log.isDebugEnabled()) { + log.debug("Verify if role {} is allowed to consume topic {}: isSuperUserOrAdmin={}", + role, topicName, isSuperUserOrAdmin); + } + if (isSuperUserOrAdmin) { + return CompletableFuture.completedFuture(true); + } else { + return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) + .thenCompose(policies -> { + if (!policies.isPresent()) { + if (log.isDebugEnabled()) { + log.debug("Policies node couldn't be found for topic : {}", topicName); + } + } else { + if (isNotBlank(subscription)) { + // validate if role is authorized to access subscription. + // (skip validation if authorization list is empty) + Set roles = policies.get().auth_policies + .getSubscriptionAuthentication().get(subscription); + if (roles != null && !roles.isEmpty() && !roles.contains(role)) { + log.warn("[{}] is not authorized to subscribe on {}-{}", + role, topicName, subscription); + return CompletableFuture.completedFuture(false); + } + + // validate if subscription-auth mode is configured + if (policies.get().subscription_auth_mode != null) { + switch (policies.get().subscription_auth_mode) { + case Prefix: + if (!subscription.startsWith(role)) { + PulsarServerException ex = + new PulsarServerException(String.format( + "Failed to create consumer - The " + + "subscription name needs to be" + + " prefixed by the " + + "authentication role, like " + + "%s-xxxx for topic: %s", + role, topicName)); + return FutureUtil.failedFuture(ex); + } + break; + default: + break; } - break; - default: - break; + } } } - } - } - return checkAuthorization(topicName, role, AuthAction.consume); - }); - } - }); + return checkAuthorization(topicName, role, AuthAction.consume); + }); + } + }); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index 9869dd7cda587..1e47a9b52ef2a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -79,8 +79,10 @@ public void cleanup() throws Exception { public void simple() throws Exception { AuthorizationService auth = pulsar.getBrokerService().getAuthorizationService(); - assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); - + try { + assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); + fail("Should throw exception while tenant not exist"); + } catch (Exception ignored) {} admin.clusters().createCluster("c1", ClusterData.builder().build()); String tenantAdmin = "role1"; admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1"))); From 2895b67249f4c06918e1dcad561ee2daf8589f99 Mon Sep 17 00:00:00 2001 From: druidliu Date: Wed, 12 Apr 2023 12:19:06 +0800 Subject: [PATCH 3/6] Fix test cases. --- .../pulsar/broker/auth/AuthorizationTest.java | 2 +- .../websocket/proxy/ProxyAuthorizationTest.java | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index 1e47a9b52ef2a..4fce7c50e1c44 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -81,7 +81,7 @@ public void simple() throws Exception { try { assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); - fail("Should throw exception while tenant not exist"); + fail("Should throw exception when tenant not exist"); } catch (Exception ignored) {} admin.clusters().createCluster("c1", ClusterData.builder().build()); String tenantAdmin = "role1"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index a3b26a4a9d122..29327d3d4ebe1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.util.EnumSet; import java.util.Optional; @@ -83,10 +84,13 @@ protected void cleanup() throws Exception { public void test() throws Exception { AuthorizationService auth = service.getAuthorizationService(); - assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); - + try { + assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); + fail("Should throw exception when tenant not exist"); + } catch (Exception ignored) {} admin.clusters().createCluster(configClusterName, ClusterData.builder().build()); - admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1"))); + String tenantAdmin = "role1"; + admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); waitForChange(); @@ -117,6 +121,10 @@ public void test() throws Exception { assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null)); + // tenant admin can produce/consume all topics, even if SubscriptionAuthMode.Prefix mode + assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1")); + assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); + admin.namespaces().deleteNamespace("p1/c1/ns1"); admin.tenants().deleteTenant("p1"); admin.clusters().deleteCluster("c1"); From a0caa9877cf6a9b69d169e999795ead6df2901d3 Mon Sep 17 00:00:00 2001 From: druidliu Date: Mon, 17 Apr 2023 12:46:04 +0800 Subject: [PATCH 4/6] Optimize. --- .../authorization/AuthorizationService.java | 6 +- .../PulsarAuthorizationProvider.java | 95 +++++++------------ 2 files changed, 37 insertions(+), 64 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 706eadf0ec2dc..d7bf1bb5e4a7c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -186,7 +186,7 @@ public CompletableFuture canProduceAsync(TopicName topicName, String ro if (isSuperUser) { return CompletableFuture.completedFuture(true); } else { - return provider.canProduceAsync(topicName, role, authenticationData); + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.PRODUCE, authenticationData); } }); } @@ -211,7 +211,7 @@ public CompletableFuture canConsumeAsync(TopicName topicName, String ro if (isSuperUser) { return CompletableFuture.completedFuture(true); } else { - return provider.canConsumeAsync(topicName, role, authenticationData, subscription); + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.CONSUME, authenticationData); } }); } @@ -293,7 +293,7 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol if (isSuperUser) { return CompletableFuture.completedFuture(true); } else { - return provider.canLookupAsync(topicName, role, authenticationData); + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.LOOKUP, authenticationData); } }); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 5beefdaf3d912..3f6d38194713e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -84,18 +84,7 @@ public void initialize(ServiceConfiguration conf, PulsarResources pulsarResource @Override public CompletableFuture canProduceAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData) { - return validateTenantAdminAccess(topicName.getTenant(), role, authenticationData) - .thenCompose(isSuperUserOrAdmin -> { - if (log.isDebugEnabled()) { - log.debug("Verify if role {} is allowed to produce topic {}: isSuperUserOrAdmin={}", - role, topicName, isSuperUserOrAdmin); - } - if (isSuperUserOrAdmin) { - return CompletableFuture.completedFuture(true); - } else { - return checkAuthorization(topicName, role, AuthAction.produce); - } - }); + return checkAuthorization(topicName, role, AuthAction.produce); } /** @@ -112,58 +101,42 @@ public CompletableFuture canProduceAsync(TopicName topicName, String ro @Override public CompletableFuture canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData, String subscription) { - return validateTenantAdminAccess(topicName.getTenant(), role, authenticationData) - .thenCompose(isSuperUserOrAdmin -> { - if (log.isDebugEnabled()) { - log.debug("Verify if role {} is allowed to consume topic {}: isSuperUserOrAdmin={}", - role, topicName, isSuperUserOrAdmin); - } - if (isSuperUserOrAdmin) { - return CompletableFuture.completedFuture(true); + return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) + .thenCompose(policies -> { + if (!policies.isPresent()) { + if (log.isDebugEnabled()) { + log.debug("Policies node couldn't be found for topic : {}", topicName); + } } else { - return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) - .thenCompose(policies -> { - if (!policies.isPresent()) { - if (log.isDebugEnabled()) { - log.debug("Policies node couldn't be found for topic : {}", topicName); - } - } else { - if (isNotBlank(subscription)) { - // validate if role is authorized to access subscription. - // (skip validation if authorization list is empty) - Set roles = policies.get().auth_policies - .getSubscriptionAuthentication().get(subscription); - if (roles != null && !roles.isEmpty() && !roles.contains(role)) { - log.warn("[{}] is not authorized to subscribe on {}-{}", - role, topicName, subscription); - return CompletableFuture.completedFuture(false); - } - - // validate if subscription-auth mode is configured - if (policies.get().subscription_auth_mode != null) { - switch (policies.get().subscription_auth_mode) { - case Prefix: - if (!subscription.startsWith(role)) { - PulsarServerException ex = - new PulsarServerException(String.format( - "Failed to create consumer - The " - + "subscription name needs to be" - + " prefixed by the " - + "authentication role, like " - + "%s-xxxx for topic: %s", - role, topicName)); - return FutureUtil.failedFuture(ex); - } - break; - default: - break; - } - } + if (isNotBlank(subscription)) { + // validate if role is authorized to access subscription. (skip validation if authorization + // list is empty) + Set roles = policies.get().auth_policies + .getSubscriptionAuthentication().get(subscription); + if (roles != null && !roles.isEmpty() && !roles.contains(role)) { + log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription); + return CompletableFuture.completedFuture(false); + } + + // validate if subscription-auth mode is configured + if (policies.get().subscription_auth_mode != null) { + switch (policies.get().subscription_auth_mode) { + case Prefix: + if (!subscription.startsWith(role)) { + PulsarServerException ex = new PulsarServerException(String.format( + "Failed to create consumer - The subscription name needs to be" + + " prefixed by the authentication role, like %s-xxxx for topic: %s", + role, topicName)); + return FutureUtil.failedFuture(ex); } - } - return checkAuthorization(topicName, role, AuthAction.consume); - }); + break; + default: + break; + } + } + } } + return checkAuthorization(topicName, role, AuthAction.consume); }); } From 6d4052e14014bd5d5c6a44c7c26315a6a61bac28 Mon Sep 17 00:00:00 2001 From: druidliu Date: Mon, 17 Apr 2023 20:41:08 +0800 Subject: [PATCH 5/6] Fix test error. --- .../authorization/AuthorizationService.java | 27 +++++-------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index d7bf1bb5e4a7c..124b9525fd6a0 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -28,6 +28,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.authentication.AuthenticationParameters; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.common.naming.NamespaceName; @@ -182,13 +183,7 @@ public CompletableFuture canProduceAsync(TopicName topicName, String ro if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { - if (isSuperUser) { - return CompletableFuture.completedFuture(true); - } else { - return provider.allowTopicOperationAsync(topicName, role, TopicOperation.PRODUCE, authenticationData); - } - }); + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.PRODUCE, authenticationData); } /** @@ -207,13 +202,9 @@ public CompletableFuture canConsumeAsync(TopicName topicName, String ro if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { - if (isSuperUser) { - return CompletableFuture.completedFuture(true); - } else { - return provider.allowTopicOperationAsync(topicName, role, TopicOperation.CONSUME, authenticationData); - } - }); + + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.CONSUME, + new AuthenticationDataSubscription(authenticationData, subscription)); } public boolean canProduce(TopicName topicName, String role, AuthenticationDataSource authenticationData) @@ -289,13 +280,7 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { - if (isSuperUser) { - return CompletableFuture.completedFuture(true); - } else { - return provider.allowTopicOperationAsync(topicName, role, TopicOperation.LOOKUP, authenticationData); - } - }); + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.LOOKUP, authenticationData); } public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, String role, From 44bf9febb97925ee4f29207c1806cefafb0fc098 Mon Sep 17 00:00:00 2001 From: druidliu Date: Tue, 18 Apr 2023 15:31:09 +0800 Subject: [PATCH 6/6] Fix AuthorizationProducerConsumerTest. --- .../api/AuthorizationProducerConsumerTest.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index 0ce3b7df07d1f..dd351286d2e5e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -1035,6 +1035,22 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol return CompletableFuture.completedFuture(grantRoles.contains(role)); } + @Override + public CompletableFuture allowTopicOperationAsync(TopicName topic, String role, + TopicOperation operation, + AuthenticationDataSource authData) { + switch (operation) { + + case PRODUCE: + return canProduceAsync(topic, role, authData); + case CONSUME: + return canConsumeAsync(topic, role, authData, authData.getSubscription()); + case LOOKUP: + return canLookupAsync(topic, role, authData); + } + return super.allowTopicOperationAsync(topic, role, operation, authData); + } + @Override public CompletableFuture grantPermissionAsync(NamespaceName namespace, Set actions, String role, String authData) {