Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -182,13 +183,7 @@ public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String ro
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> {
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
return provider.canProduceAsync(topicName, role, authenticationData);
}
});
return provider.allowTopicOperationAsync(topicName, role, TopicOperation.PRODUCE, authenticationData);
}

/**
Expand All @@ -207,13 +202,9 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> {
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
return provider.canConsumeAsync(topicName, role, authenticationData, subscription);
}
});

return provider.allowTopicOperationAsync(topicName, role, TopicOperation.CONSUME,
new AuthenticationDataSubscription(authenticationData, subscription));
}

public boolean canProduce(TopicName topicName, String role, AuthenticationDataSource authenticationData)
Expand Down Expand Up @@ -289,13 +280,7 @@ public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String rol
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
return provider.isSuperUser(role, authenticationData, conf).thenComposeAsync(isSuperUser -> {
if (isSuperUser) {
return CompletableFuture.completedFuture(true);
} else {
return provider.canLookupAsync(topicName, role, authenticationData);
}
});
return provider.allowTopicOperationAsync(topicName, role, TopicOperation.LOOKUP, authenticationData);
}

public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,13 @@ public void cleanup() throws Exception {
public void simple() throws Exception {
AuthorizationService auth = pulsar.getBrokerService().getAuthorizationService();

assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

try {
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
fail("Should throw exception when tenant not exist");
} catch (Exception ignored) {}
admin.clusters().createCluster("c1", ClusterData.builder().build());
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
String tenantAdmin = "role1";
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1")));
waitForChange();
admin.namespaces().createNamespace("p1/c1/ns1");
waitForChange();
Expand Down Expand Up @@ -215,21 +218,22 @@ public void simple() throws Exception {
SubscriptionAuthMode.Prefix);
waitForChange();

assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null));
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null));
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null));
try {
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "sub1"));
fail();
} catch (Exception ignored) {}
// tenant admin can consume all topics, even if SubscriptionAuthMode.Prefix mode
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1"));
try {
assertFalse(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "sub2"));
fail();
} catch (Exception ignored) {}

assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null, "role1-sub1"));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "role1-sub1"));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2"));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "pulsar.super_user", null, "role3-sub1"));

// tenant admin can produce all topics
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null));

admin.namespaces().deleteNamespace("p1/c1/ns1");
admin.tenants().deleteTenant("p1");
admin.clusters().deleteCluster("c1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,22 @@ public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String rol
return CompletableFuture.completedFuture(grantRoles.contains(role));
}

@Override
public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topic, String role,
TopicOperation operation,
AuthenticationDataSource authData) {
switch (operation) {

case PRODUCE:
return canProduceAsync(topic, role, authData);
case CONSUME:
return canConsumeAsync(topic, role, authData, authData.getSubscription());
case LOOKUP:
return canLookupAsync(topic, role, authData);
}
return super.allowTopicOperationAsync(topic, role, operation, authData);
}

@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
String role, String authData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.EnumSet;
import java.util.Optional;
Expand Down Expand Up @@ -83,10 +84,13 @@ protected void cleanup() throws Exception {
public void test() throws Exception {
AuthorizationService auth = service.getAuthorizationService();

assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

try {
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
fail("Should throw exception when tenant not exist");
} catch (Exception ignored) {}
admin.clusters().createCluster(configClusterName, ClusterData.builder().build());
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
String tenantAdmin = "role1";
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(tenantAdmin), Sets.newHashSet("c1")));
waitForChange();
admin.namespaces().createNamespace("p1/c1/ns1");
waitForChange();
Expand Down Expand Up @@ -117,6 +121,10 @@ public void test() throws Exception {
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null));

// tenant admin can produce/consume all topics, even if SubscriptionAuthMode.Prefix mode
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null, "sub1"));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), tenantAdmin, null));

admin.namespaces().deleteNamespace("p1/c1/ns1");
admin.tenants().deleteTenant("p1");
admin.clusters().deleteCluster("c1");
Expand Down