From bc3346a9c4ae3df3df21be9ae33923574e736a9d Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 19 Apr 2023 10:44:11 -0500 Subject: [PATCH 1/3] Revert "[fix][broker] Fix tenant admin authorization bug. (#20068)" This reverts commit fc17c1d98a3c1edd975c131d174a9ef69887d9cd. --- .../authorization/AuthorizationService.java | 27 ++++++++++++++----- .../pulsar/broker/auth/AuthorizationTest.java | 22 +++++++-------- .../AuthorizationProducerConsumerTest.java | 16 ----------- .../proxy/ProxyAuthorizationTest.java | 14 +++------- 4 files changed, 33 insertions(+), 46 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index a9225f5e48fa9..0c61219b57a50 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -28,7 +28,6 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; -import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.authentication.AuthenticationParameters; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.common.naming.NamespaceName; @@ -183,7 +182,13 @@ public CompletableFuture canProduceAsync(TopicName topicName, String ro if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowTopicOperationAsync(topicName, role, TopicOperation.PRODUCE, authenticationData); + return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { + if (isSuperUser) { + return CompletableFuture.completedFuture(true); + } else { + return provider.canProduceAsync(topicName, role, authenticationData); + } + }); } /** @@ -202,9 +207,13 @@ public CompletableFuture canConsumeAsync(TopicName topicName, String ro if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - - return provider.allowTopicOperationAsync(topicName, role, TopicOperation.CONSUME, - new AuthenticationDataSubscription(authenticationData, subscription)); + return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { + if (isSuperUser) { + return CompletableFuture.completedFuture(true); + } else { + return provider.canConsumeAsync(topicName, role, authenticationData, subscription); + } + }); } public boolean canProduce(TopicName topicName, String role, AuthenticationDataSource authenticationData) @@ -280,7 +289,13 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowTopicOperationAsync(topicName, role, TopicOperation.LOOKUP, authenticationData); + return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { + if (isSuperUser) { + return CompletableFuture.completedFuture(true); + } else { + return provider.canLookupAsync(topicName, role, authenticationData); + } + }); } public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, String role, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index 4fce7c50e1c44..58cf4ee418ea4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -79,13 +79,10 @@ public void cleanup() throws Exception { public void simple() throws Exception { AuthorizationService auth = pulsar.getBrokerService().getAuthorizationService(); - try { - assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); - fail("Should throw exception when tenant not exist"); - } catch (Exception ignored) {} + assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); + admin.clusters().createCluster("c1", ClusterData.builder().build()); - String tenantAdmin = "role1"; - admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1"))); + admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); waitForChange(); @@ -218,22 +215,21 @@ public void simple() throws Exception { SubscriptionAuthMode.Prefix); waitForChange(); - assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); + assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null)); assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null)); - // tenant admin can consume all topics, even if SubscriptionAuthMode.Prefix mode - assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1")); + try { + assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "sub1")); + fail(); + } catch (Exception ignored) {} try { assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "sub2")); fail(); } catch (Exception ignored) {} - assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "role1-sub1")); + assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "role1-sub1")); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2")); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "pulsar.super_user", null, "role3-sub1")); - // tenant admin can produce all topics - assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); - admin.namespaces().deleteNamespace("p1/c1/ns1"); admin.tenants().deleteTenant("p1"); admin.clusters().deleteCluster("c1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index dd351286d2e5e..0ce3b7df07d1f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -1035,22 +1035,6 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol return CompletableFuture.completedFuture(grantRoles.contains(role)); } - @Override - public CompletableFuture allowTopicOperationAsync(TopicName topic, String role, - TopicOperation operation, - AuthenticationDataSource authData) { - switch (operation) { - - case PRODUCE: - return canProduceAsync(topic, role, authData); - case CONSUME: - return canConsumeAsync(topic, role, authData, authData.getSubscription()); - case LOOKUP: - return canLookupAsync(topic, role, authData); - } - return super.allowTopicOperationAsync(topic, role, operation, authData); - } - @Override public CompletableFuture grantPermissionAsync(NamespaceName namespace, Set actions, String role, String authData) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index 29327d3d4ebe1..a3b26a4a9d122 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -25,7 +25,6 @@ import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.util.EnumSet; import java.util.Optional; @@ -84,13 +83,10 @@ protected void cleanup() throws Exception { public void test() throws Exception { AuthorizationService auth = service.getAuthorizationService(); - try { - assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); - fail("Should throw exception when tenant not exist"); - } catch (Exception ignored) {} + assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); + admin.clusters().createCluster(configClusterName, ClusterData.builder().build()); - String tenantAdmin = "role1"; - admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1"))); + admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); waitForChange(); @@ -121,10 +117,6 @@ public void test() throws Exception { assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null)); - // tenant admin can produce/consume all topics, even if SubscriptionAuthMode.Prefix mode - assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1")); - assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); - admin.namespaces().deleteNamespace("p1/c1/ns1"); admin.tenants().deleteTenant("p1"); admin.clusters().deleteCluster("c1"); From 11bea2962b0d19c89307bc91a6d089b7ebd41520 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 19 Apr 2023 11:05:07 -0500 Subject: [PATCH 2/3] [fix][broker] Producer/Consumer should call allowTopicOperationAsync --- .../java/org/apache/pulsar/broker/service/Consumer.java | 8 ++++++-- .../java/org/apache/pulsar/broker/service/Producer.java | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 1ee3f513ef288..a3f9da41e6b35 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -43,6 +43,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageId; @@ -56,6 +57,7 @@ import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; +import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.schema.SchemaType; @@ -901,8 +903,10 @@ public String toString() { public CompletableFuture checkPermissionsAsync() { TopicName topicName = TopicName.get(subscription.getTopicName()); if (cnx.getBrokerService().getAuthorizationService() != null) { - return cnx.getBrokerService().getAuthorizationService().canConsumeAsync(topicName, appId, - cnx.getAuthenticationData(), subscription.getName()) + AuthenticationDataSubscription authData = + new AuthenticationDataSubscription(cnx.getAuthenticationData(), subscription.getName()); + return cnx.getBrokerService().getAuthorizationService() + .allowTopicOperationAsync(topicName, TopicOperation.CONSUME, appId, authData) .handle((ok, e) -> { if (e != null) { log.warn("[{}] Get unexpected error while authorizing [{}] {}", appId, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 5b62e3261e64f..53b79f06e8e24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -51,6 +51,7 @@ import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; +import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl; import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl; import org.apache.pulsar.common.protocol.Commands; @@ -781,7 +782,7 @@ public CompletableFuture checkPermissionsAsync() { TopicName topicName = TopicName.get(topic.getName()); if (cnx.getBrokerService().getAuthorizationService() != null) { return cnx.getBrokerService().getAuthorizationService() - .canProduceAsync(topicName, appId, cnx.getAuthenticationData()) + .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, appId, cnx.getAuthenticationData()) .handle((ok, ex) -> { if (ex != null) { log.warn("[{}] Get unexpected error while autorizing [{}] {}", appId, topic.getName(), From f757419fd2f97d6b4c3db08be634205eef4c945d Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 19 Apr 2023 12:32:12 -0500 Subject: [PATCH 3/3] Revert "Revert "[fix][broker] Fix tenant admin authorization bug. (#20068)"" This reverts commit bc3346a9c4ae3df3df21be9ae33923574e736a9d. --- .../authorization/AuthorizationService.java | 27 +++++-------------- .../pulsar/broker/auth/AuthorizationTest.java | 22 ++++++++------- .../AuthorizationProducerConsumerTest.java | 16 +++++++++++ .../proxy/ProxyAuthorizationTest.java | 14 +++++++--- 4 files changed, 46 insertions(+), 33 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 0c61219b57a50..a9225f5e48fa9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -28,6 +28,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.authentication.AuthenticationParameters; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.common.naming.NamespaceName; @@ -182,13 +183,7 @@ public CompletableFuture canProduceAsync(TopicName topicName, String ro if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { - if (isSuperUser) { - return CompletableFuture.completedFuture(true); - } else { - return provider.canProduceAsync(topicName, role, authenticationData); - } - }); + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.PRODUCE, authenticationData); } /** @@ -207,13 +202,9 @@ public CompletableFuture canConsumeAsync(TopicName topicName, String ro if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { - if (isSuperUser) { - return CompletableFuture.completedFuture(true); - } else { - return provider.canConsumeAsync(topicName, role, authenticationData, subscription); - } - }); + + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.CONSUME, + new AuthenticationDataSubscription(authenticationData, subscription)); } public boolean canProduce(TopicName topicName, String role, AuthenticationDataSource authenticationData) @@ -289,13 +280,7 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> { - if (isSuperUser) { - return CompletableFuture.completedFuture(true); - } else { - return provider.canLookupAsync(topicName, role, authenticationData); - } - }); + return provider.allowTopicOperationAsync(topicName, role, TopicOperation.LOOKUP, authenticationData); } public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, String role, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index 58cf4ee418ea4..4fce7c50e1c44 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -79,10 +79,13 @@ public void cleanup() throws Exception { public void simple() throws Exception { AuthorizationService auth = pulsar.getBrokerService().getAuthorizationService(); - assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); - + try { + assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); + fail("Should throw exception when tenant not exist"); + } catch (Exception ignored) {} admin.clusters().createCluster("c1", ClusterData.builder().build()); - admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1"))); + String tenantAdmin = "role1"; + admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); waitForChange(); @@ -215,21 +218,22 @@ public void simple() throws Exception { SubscriptionAuthMode.Prefix); waitForChange(); - assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null)); + assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null)); - try { - assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "sub1")); - fail(); - } catch (Exception ignored) {} + // tenant admin can consume all topics, even if SubscriptionAuthMode.Prefix mode + assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1")); try { assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "sub2")); fail(); } catch (Exception ignored) {} - assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "role1-sub1")); + assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "role1-sub1")); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2")); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "pulsar.super_user", null, "role3-sub1")); + // tenant admin can produce all topics + assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); + admin.namespaces().deleteNamespace("p1/c1/ns1"); admin.tenants().deleteTenant("p1"); admin.clusters().deleteCluster("c1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index 0ce3b7df07d1f..dd351286d2e5e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -1035,6 +1035,22 @@ public CompletableFuture canLookupAsync(TopicName topicName, String rol return CompletableFuture.completedFuture(grantRoles.contains(role)); } + @Override + public CompletableFuture allowTopicOperationAsync(TopicName topic, String role, + TopicOperation operation, + AuthenticationDataSource authData) { + switch (operation) { + + case PRODUCE: + return canProduceAsync(topic, role, authData); + case CONSUME: + return canConsumeAsync(topic, role, authData, authData.getSubscription()); + case LOOKUP: + return canLookupAsync(topic, role, authData); + } + return super.allowTopicOperationAsync(topic, role, operation, authData); + } + @Override public CompletableFuture grantPermissionAsync(NamespaceName namespace, Set actions, String role, String authData) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index a3b26a4a9d122..29327d3d4ebe1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.util.EnumSet; import java.util.Optional; @@ -83,10 +84,13 @@ protected void cleanup() throws Exception { public void test() throws Exception { AuthorizationService auth = service.getAuthorizationService(); - assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); - + try { + assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); + fail("Should throw exception when tenant not exist"); + } catch (Exception ignored) {} admin.clusters().createCluster(configClusterName, ClusterData.builder().build()); - admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1"))); + String tenantAdmin = "role1"; + admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); waitForChange(); @@ -117,6 +121,10 @@ public void test() throws Exception { assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null)); + // tenant admin can produce/consume all topics, even if SubscriptionAuthMode.Prefix mode + assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1")); + assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null)); + admin.namespaces().deleteNamespace("p1/c1/ns1"); admin.tenants().deleteTenant("p1"); admin.clusters().deleteCluster("c1");