denyPatternsPrefixed, String allowStr) {
+ try {
+ String lastKey = denyPatternsPrefixed.headMap(allowStr).lastKey();
+ return allowStr.startsWith(lastKey);
+ } catch (NoSuchElementException e) {
+ return false;
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala
new file mode 100644
index 0000000000000..e69de29bb2d1d
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 1f29c5031fa29..b4bd6bbded463 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -57,6 +57,7 @@ versions += [
caffeine: "3.2.0",
bndlib: "7.1.0",
checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "12.3.1",
+ commonsCollection: "4.5.0",
commonsValidator: "1.10.1",
classgraph: "4.8.179",
gradle: "9.4.1",
@@ -158,6 +159,7 @@ libs += [
bndlib:"biz.aQute.bnd:biz.aQute.bndlib:$versions.bndlib",
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
+ commonsCollection: "org.apache.commons:commons-collections4:$versions.commonsCollection",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jacksonAnnotations",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java
new file mode 100644
index 0000000000000..bca27823ae6c0
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.acl;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.*;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.TimeValue;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Benchmark (resourceNamePrefix) Mode Cnt Score Error Units
+ * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType AuthorizeByResourceTypeSearch- avgt 7 4.252 ± 0.024 us/op
+ * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType Authorize...Check- avgt 7 4.301 ± 0.030 us/op
+ * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType Authorize...Difference- avgt 7 4.592 ± 0.042 us/op
+ *
+ *
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class AuthorizeByResourceTypeSearch {
+
+ @Param({
+ "AuthorizeByResourceTypeSearch-",
+ "AuthorizeByResourceTypeSearchOneMoreWordLongForDenyPatternCheck-",
+ "AuthorizeByResourceTypeSearchOneMoreWordLongForDenyPatternCheckAndWeAddOneMoreWordJustInCaseToShowBenchmarkDifference-",
+ })
+ String resourceNamePrefix;
+
+ @Param({"4","10"})
+ int typeOfPrefixedAndLiteralPattern;
+
+ private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
+ private String authorizeByResourceTypeHostName = "127.0.0.2";
+ private StandardAuthorizer authorizer;
+ private RequestContext authorizeByResourceTypeContext;
+ private AclBindingFilter filter;
+ private AclOperation op;
+ private ResourceType resourceType;
+
+ @Setup(Level.Trial)
+ public void setup() throws Exception {
+ authorizer = new StandardAuthorizer();
+ filter = AclBindingFilter.ANY;
+ op = AclOperation.READ;
+ resourceType = ResourceType.TOPIC;
+ prepareAclCache();
+ authorizeByResourceTypeContext = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(),
+ "someclient", 1), "1", InetAddress.getByName(authorizeByResourceTypeHostName), principal,
+ ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
+ }
+
+
+ /**
+ * What we do in this test:
+ *
+ * For every Allow Literal Pattern -- iterate on count of typeOfPrefixedAndLiteralPattern
+ * check in denyPatternsLiteral map -- check on exists then no found
+ * check in denyPatternsPrefixed -- find in denyPatternsPrefixed prefix
+ * return if allow (in this test we don't find)
+ *
+ * for every Allow Prefix pattern
+ * check in denyPatternsPrefixed -- find in denyPatternsPrefixed prefix
+ * return allow
+ *
+ * return deny - never reach
+ *
+ * We iterate
+ * typeOfPrefixedAndLiteralPattern + 1 counts.
+ * Make (typeOfPrefixedAndLiteralPattern + 1) * 2 search in PatriciaTrie
+ */
+ private void prepareAclCache() {
+ Map> aclEntries = new HashMap<>();
+
+ String prefix = "a";
+
+ List patterns = new ArrayList<>();
+ for(int i = 0; i< typeOfPrefixedAndLiteralPattern; i++) {
+ patterns.add(resourceNamePrefix + prefix.repeat(i+1));
+ }
+
+ String allowPattern = resourceNamePrefix;
+
+ for(String pattern : patterns) {
+ // PREFIX DENY
+ makeDeny(pattern, aclEntries, PatternType.PREFIXED);
+ // ALLOW LITERAL
+ makeAllow(pattern, aclEntries, PatternType.LITERAL);
+ }
+
+ // Add one Allow
+ makeAllow(allowPattern, aclEntries, PatternType.PREFIXED);
+
+
+ // makeDeny(denyPattern1, aclEntries);
+ setupAcls(aclEntries);
+ }
+
+ private void makeDeny(String denyPattern, Map> aclEntries, PatternType patternType) {
+ ResourcePattern resource = new ResourcePattern(ResourceType.TOPIC, denyPattern,
+ patternType);
+
+ Set entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>());
+
+ AccessControlEntry denyAce = new AccessControlEntry(principal.toString(), authorizeByResourceTypeHostName,
+ AclOperation.READ, AclPermissionType.DENY);
+
+ entries.add(denyAce);
+ }
+
+ private void makeAllow(String denyPattern, Map> aclEntries, PatternType patternType) {
+ ResourcePattern resourceAllow = new ResourcePattern(ResourceType.TOPIC, denyPattern,
+ patternType);
+
+ Set entriesAllow = aclEntries.computeIfAbsent(resourceAllow, k -> new HashSet<>());
+
+ AccessControlEntry allowAce = new AccessControlEntry(principal.toString(), authorizeByResourceTypeHostName,
+ AclOperation.READ, AclPermissionType.ALLOW);
+
+ entriesAllow.add(allowAce);
+ }
+
+ private void setupAcls(Map> aclEntries) {
+ for (Map.Entry> entryMap : aclEntries.entrySet()) {
+ ResourcePattern resourcePattern = entryMap.getKey();
+
+ for (AccessControlEntry accessControlEntry : entryMap.getValue()) {
+ StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, accessControlEntry));
+ authorizer.addAcl(Uuid.randomUuid(), standardAcl);
+ }
+ authorizer.completeInitialLoad();
+
+ }
+ }
+
+ @Setup(Level.Iteration)
+ public void setupIteration() {
+ authorizer.withPluginMetrics(new PluginMetricsImpl(new Metrics(), new HashMap<>(1000000)));
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() throws IOException {
+ authorizer.close();
+ }
+
+ @Benchmark
+ public AuthorizationResult testAuthorizeByResourceType() {
+ return authorizer.authorizeByResourceType(authorizeByResourceTypeContext, op, resourceType);
+ }
+
+ public static void main(String[] args) {
+ Options opt = new OptionsBuilder()
+ .include(AuthorizeByResourceTypeSearch.class.getSimpleName())
+ .warmupIterations(7)
+ .warmupTime(TimeValue.seconds(1))
+ .measurementIterations(7)
+ .measurementTime(TimeValue.seconds(1))
+ .timeUnit(TimeUnit.MICROSECONDS)
+ .forks(1)
+ .build();
+ try {
+ new Runner(opt).run();
+ } catch (RunnerException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java
index 6d4536301f576..dfa9d914c6bd5 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java
@@ -23,6 +23,8 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -197,6 +199,11 @@ private Boolean shouldDeny() {
return rand.nextDouble() * 100.0 - eps < denyPercentage;
}
+ @Setup(Level.Iteration)
+ public void setupIteration() {
+ authorizer.withPluginMetrics(new PluginMetricsImpl(new Metrics(), new HashMap<>(1000000)));
+ }
+
@TearDown(Level.Trial)
public void tearDown() throws IOException {
authorizer.close();
diff --git a/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java b/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java
index 926d185548da3..ee684ec610bb0 100644
--- a/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java
+++ b/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java
@@ -234,6 +234,29 @@ public void testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow()
"User1 from host2 should have READ access to at least one topic");
}
+ @Test
+ public void testAuthorizeByResourceTypeLiteralResourceDenyDominate() throws Exception {
+ KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1");
+ InetAddress host1 = InetAddress.getByName("192.168.1.1");
+
+ ResourcePattern abcd = new ResourcePattern(GROUP, "abcd", PREFIXED);
+ ResourcePattern abcde = new ResourcePattern(GROUP, "abcde", LITERAL);
+
+ RequestContext u1h1Context = newRequestContext(user1, host1);
+
+ AccessControlEntry allowAce = new AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW);
+ AccessControlEntry denyAce = new AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, DENY);
+
+ addAcls(authorizer, Set.of(allowAce), abcde);
+
+ assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.GROUP),
+ "User1 from host1 should have READ access to at least one group");
+
+ addAcls(authorizer, Set.of(denyAce), abcd);
+ assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.GROUP),
+ "User1 from host1 now should not have READ access to any group");
+ }
+
@Test
public void testAuthorizeByResourceTypeDenyTakesPrecedence() throws Exception {
KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1");