From 2bb16b48c85e0f85a27672ecc6c9d1fcc5537dd9 Mon Sep 17 00:00:00 2001 From: Guang Zhao Date: Tue, 18 Nov 2025 13:43:25 +1100 Subject: [PATCH 1/7] Add apache commons collection 4 as dependency --- build.gradle | 1 + gradle/dependencies.gradle | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index a1768cfac66dd..b7d5fb05800e9 100644 --- a/build.gradle +++ b/build.gradle @@ -1858,6 +1858,7 @@ project(':clients') { implementation libs.opentelemetryProto implementation libs.protobuf implementation libs.slf4jApi + implementation libs.commonsCollections4 // libraries which should be added as runtime dependencies in generated pom.xml should be defined here: shadowed libs.zstd diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 3e6664c17ed90..d0e2546124cab 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -131,7 +131,8 @@ versions += [ zstd: "1.5.6-10", junitPlatform: "1.13.1", hdrHistogram: "2.2.2", - hash4j: "0.22.0" + hash4j: "0.22.0", + commonsCollections4: "4.4" ] libs += [ @@ -234,4 +235,5 @@ libs += [ httpclient: "org.apache.httpcomponents:httpclient:$versions.httpclient", hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram", hash4j: "com.dynatrace.hash4j:hash4j:$versions.hash4j", + commonsCollections4: "org.apache.commons:commons-collections4:$versions.commonsCollections4", ] From 33493ef07e0d6bb032b57bb164d7af96dcdf1554 Mon Sep 17 00:00:00 2001 From: Guang Zhao Date: Tue, 18 Nov 2025 19:31:05 +1100 Subject: [PATCH 2/7] Use trie as set of strings --- .../apache/kafka/common/utils/TrieSet.java | 118 ++++++++++++++++++ .../kafka/server/authorizer/Authorizer.java | 29 ++--- 2 files changed, 130 insertions(+), 17 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java diff --git a/clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java b/clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java new file mode 100644 index 0000000000000..d2ff5528b98ee --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.apache.commons.collections4.trie.PatriciaTrie; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; + +public class TrieSet implements Set { + private final PatriciaTrie trie; + + public TrieSet() { + trie = new PatriciaTrie<>(); + } + + public Set prefixSet(String key) { + return trie.prefixMap(key).keySet(); + } + + @Override + public int size() { + return trie.size(); + } + + @Override + public boolean isEmpty() { + return trie.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return trie.containsKey(o); + } + + @Override + public Iterator iterator() { + return trie.keySet().iterator(); + } + + @Override + public Object[] toArray() { + return trie.keySet().toArray(); + } + + @Override + public T[] toArray(T[] a) { + return trie.keySet().toArray(a); + } + + @Override + public boolean add(String s) { + Objects.requireNonNull(s, "Expect non-null element to add"); + return trie.putIfAbsent(s, s) == null; + } + + @Override + public boolean remove(Object o) { + return trie.remove(o) != null; + } + + @Override + public boolean containsAll(Collection c) { + for (final Object k : c) { + if (!trie.containsKey(k)) + return false; + } + return true; + } + + @Override + public boolean addAll(Collection c) { + boolean mutated = false; + for (final String k : c) + mutated |= add(k); + return mutated; + } + + @Override + public boolean retainAll(Collection c) { + boolean mutated = false; + for (final Object k : c) { + if (!contains(k)) + mutated |= remove(k); + } + return mutated; + } + + @Override + public boolean removeAll(Collection c) { + boolean mutated = false; + for (final Object k : c) + mutated |= remove(k); + return mutated; + } + + @Override + public void clear() { + trie.clear(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java index 68f1aaf678d1d..0a83b3dd30ff2 100644 --- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java +++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java @@ -30,8 +30,10 @@ import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.kafka.common.utils.TrieSet; import java.io.Closeable; +import java.util.ArrayList; import java.util.Collections; import java.util.EnumMap; import java.util.HashSet; @@ -207,8 +209,8 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), }}; EnumMap> allowPatterns = new EnumMap<>(PatternType.class) {{ - put(PatternType.LITERAL, new HashSet<>()); - put(PatternType.PREFIXED, new HashSet<>()); + put(PatternType.LITERAL, new TrieSet()); + put(PatternType.PREFIXED, new TrieSet()); }}; boolean hasWildCardAllow = false; @@ -271,22 +273,15 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), // For any literal allowed, if there's no dominant literal and prefix denied, return allow. // For any prefix allowed, if there's no dominant prefix denied, return allow. for (Map.Entry> entry : allowPatterns.entrySet()) { - for (String allowStr : entry.getValue()) { - if (entry.getKey() == PatternType.LITERAL - && denyPatterns.get(PatternType.LITERAL).contains(allowStr)) - continue; - StringBuilder sb = new StringBuilder(); - boolean hasDominatedDeny = false; - for (char ch : allowStr.toCharArray()) { - sb.append(ch); - if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) { - hasDominatedDeny = true; - break; - } - } - if (!hasDominatedDeny) - return AuthorizationResult.ALLOWED; + TrieSet toAllow = (TrieSet) entry.getValue(); + if (entry.getKey() == PatternType.LITERAL) + toAllow.removeAll(denyPatterns.get(PatternType.LITERAL)); + for (final String d : denyPatterns.get(PatternType.PREFIXED)) { + List toDeny = new ArrayList<>(toAllow.prefixSet(d)); + toAllow.removeAll(toDeny); } + if (!toAllow.isEmpty()) + return AuthorizationResult.ALLOWED; } return AuthorizationResult.DENIED; From b0d876ece0d36a1b3e7137668c9f16e79fc0f66c Mon Sep 17 00:00:00 2001 From: Guang Zhao Date: Tue, 18 Nov 2025 19:32:26 +1100 Subject: [PATCH 3/7] Allow apache commons collections as package --- checkstyle/import-control.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index dd5158b0ed51e..e80dec3b89fb8 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -201,6 +201,7 @@ + @@ -253,6 +254,10 @@ + + + + From af91c077521d49f7b2ce03193f7364d6b46f029f Mon Sep 17 00:00:00 2001 From: Guang Zhao Date: Tue, 18 Nov 2025 20:27:34 +1100 Subject: [PATCH 4/7] Add test and doc --- .../apache/kafka/common/utils/TrieSet.java | 28 +- .../kafka/server/authorizer/Authorizer.java | 6 +- .../kafka/common/utils/TrieSetTest.java | 450 ++++++++++++++++++ 3 files changed, 478 insertions(+), 6 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/common/utils/TrieSetTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java b/clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java index d2ff5528b98ee..4fdcb70a27b01 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java @@ -24,6 +24,15 @@ import java.util.Objects; import java.util.Set; +/** + * A set of strings backed by a PATRICIA trie. + *

+ * A {@link org.apache.commons.collections4.trie.PatriciaTrie} + * (Practical Algorithm to Retrieve Information + * Coded in Alphanumeric) implements efficient worst-case O(K)-time operations, where K is the max key bit-length in the + * tree. + *

+ */ public class TrieSet implements Set { private final PatriciaTrie trie; @@ -31,6 +40,14 @@ public TrieSet() { trie = new PatriciaTrie<>(); } + /** + * Get a set view of all strings with the same prefix. + *

+ * The view is backed by the trie. If you want to modify the trie while iterating the view, create a copy first. + *

+ * @param key the prefix to search for + * @return a set view of all strings with the given prefix + */ public Set prefixSet(String key) { return trie.prefixMap(key).keySet(); } @@ -62,6 +79,7 @@ public Object[] toArray() { @Override public T[] toArray(T[] a) { + Objects.requireNonNull(a); return trie.keySet().toArray(a); } @@ -96,9 +114,13 @@ public boolean addAll(Collection c) { @Override public boolean retainAll(Collection c) { boolean mutated = false; - for (final Object k : c) { - if (!contains(k)) - mutated |= remove(k); + Iterator it = iterator(); + while (it.hasNext()) { + String element = it.next(); + if (!c.contains(element)) { + it.remove(); + mutated = true; + } } return mutated; } diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java index 0a83b3dd30ff2..a9d8ebb5c15f3 100644 --- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java +++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java @@ -207,7 +207,7 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), put(PatternType.LITERAL, new HashSet<>()); put(PatternType.PREFIXED, new HashSet<>()); }}; - EnumMap> allowPatterns = + EnumMap allowPatterns = new EnumMap<>(PatternType.class) {{ put(PatternType.LITERAL, new TrieSet()); put(PatternType.PREFIXED, new TrieSet()); @@ -272,8 +272,8 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), // For any literal allowed, if there's no dominant literal and prefix denied, return allow. // For any prefix allowed, if there's no dominant prefix denied, return allow. - for (Map.Entry> entry : allowPatterns.entrySet()) { - TrieSet toAllow = (TrieSet) entry.getValue(); + for (Map.Entry entry : allowPatterns.entrySet()) { + TrieSet toAllow = entry.getValue(); if (entry.getKey() == PatternType.LITERAL) toAllow.removeAll(denyPatterns.get(PatternType.LITERAL)); for (final String d : denyPatterns.get(PatternType.PREFIXED)) { diff --git a/clients/src/test/java/org/apache/kafka/common/utils/TrieSetTest.java b/clients/src/test/java/org/apache/kafka/common/utils/TrieSetTest.java new file mode 100644 index 0000000000000..58e09aecc5db6 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/TrieSetTest.java @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TrieSetTest { + + private TrieSet trieSet; + + @BeforeEach + public void setUp() { + trieSet = new TrieSet(); + } + + @Test + public void testEmptySet() { + assertTrue(trieSet.isEmpty()); + assertEquals(0, trieSet.size()); + } + + @Test + public void testAddSingleElement() { + assertTrue(trieSet.add("test")); + assertFalse(trieSet.isEmpty()); + assertEquals(1, trieSet.size()); + assertTrue(trieSet.contains("test")); + } + + @Test + public void testAddDuplicateElement() { + assertTrue(trieSet.add("test")); + assertFalse(trieSet.add("test")); + assertEquals(1, trieSet.size()); + } + + @Test + public void testAddNullElement() { + assertThrows(NullPointerException.class, () -> trieSet.add(null)); + } + + @Test + public void testContains() { + trieSet.add("apple"); + trieSet.add("application"); + trieSet.add("apply"); + + assertTrue(trieSet.contains("apple")); + assertTrue(trieSet.contains("application")); + assertTrue(trieSet.contains("apply")); + assertFalse(trieSet.contains("app")); + assertFalse(trieSet.contains("apples")); + } + + @Test + public void testRemove() { + trieSet.add("test"); + assertTrue(trieSet.remove("test")); + assertFalse(trieSet.contains("test")); + assertEquals(0, trieSet.size()); + assertTrue(trieSet.isEmpty()); + } + + @Test + public void testRemoveNonExistent() { + trieSet.add("test"); + assertFalse(trieSet.remove("nonexistent")); + assertEquals(1, trieSet.size()); + } + + @Test + public void testClear() { + trieSet.add("test1"); + trieSet.add("test2"); + trieSet.add("test3"); + + assertEquals(3, trieSet.size()); + trieSet.clear(); + assertEquals(0, trieSet.size()); + assertTrue(trieSet.isEmpty()); + } + + @Test + public void testAddAll() { + Set elements = new HashSet<>(Arrays.asList("apple", "banana", "cherry")); + assertTrue(trieSet.addAll(elements)); + assertEquals(3, trieSet.size()); + assertTrue(trieSet.contains("apple")); + assertTrue(trieSet.contains("banana")); + assertTrue(trieSet.contains("cherry")); + } + + @Test + public void testAddAllEmpty() { + Set elements = new HashSet<>(); + assertFalse(trieSet.addAll(elements)); + assertEquals(0, trieSet.size()); + } + + @Test + public void testAddAllWithExistingElements() { + trieSet.add("apple"); + Set elements = new HashSet<>(Arrays.asList("apple", "banana")); + assertTrue(trieSet.addAll(elements)); + assertEquals(2, trieSet.size()); + } + + @Test + public void testRemoveAll() { + trieSet.add("apple"); + trieSet.add("banana"); + trieSet.add("cherry"); + + Set toRemove = new HashSet<>(Arrays.asList("apple", "cherry")); + assertTrue(trieSet.removeAll(toRemove)); + assertEquals(1, trieSet.size()); + assertTrue(trieSet.contains("banana")); + assertFalse(trieSet.contains("apple")); + assertFalse(trieSet.contains("cherry")); + } + + @Test + public void testRemoveAllNonExistent() { + trieSet.add("apple"); + Set toRemove = new HashSet<>(Arrays.asList("banana", "cherry")); + assertFalse(trieSet.removeAll(toRemove)); + assertEquals(1, trieSet.size()); + assertTrue(trieSet.contains("apple")); + } + + @Test + public void testContainsAll() { + trieSet.add("apple"); + trieSet.add("banana"); + trieSet.add("cherry"); + + Set subset = new HashSet<>(Arrays.asList("apple", "banana")); + assertTrue(trieSet.containsAll(subset)); + + Set notSubset = new HashSet<>(Arrays.asList("apple", "durian")); + assertFalse(trieSet.containsAll(notSubset)); + } + + @Test + public void testRetainAll() { + trieSet.add("apple"); + trieSet.add("banana"); + trieSet.add("cherry"); + trieSet.add("durian"); + + Set toRetain = new HashSet<>(Arrays.asList("apple", "cherry", "elderberry")); + assertTrue(trieSet.retainAll(toRetain)); + assertEquals(2, trieSet.size()); + assertTrue(trieSet.contains("apple")); + assertTrue(trieSet.contains("cherry")); + assertFalse(trieSet.contains("banana")); + assertFalse(trieSet.contains("durian")); + } + + @Test + public void testRetainAllNoChange() { + trieSet.add("apple"); + trieSet.add("banana"); + + Set toRetain = new HashSet<>(Arrays.asList("apple", "banana", "cherry")); + assertFalse(trieSet.retainAll(toRetain)); + assertEquals(2, trieSet.size()); + } + + @Test + public void testIterator() { + trieSet.add("apple"); + trieSet.add("banana"); + trieSet.add("cherry"); + + Set iterated = new HashSet<>(); + for (String s : trieSet) { + iterated.add(s); + } + assertEquals(3, iterated.size()); + assertTrue(iterated.contains("apple")); + assertTrue(iterated.contains("banana")); + assertTrue(iterated.contains("cherry")); + } + + @Test + public void testIteratorRemove() { + trieSet.add("apple"); + trieSet.add("banana"); + trieSet.add("cherry"); + + Iterator it = trieSet.iterator(); + while (it.hasNext()) { + String s = it.next(); + if (s.equals("banana")) { + it.remove(); + } + } + assertEquals(2, trieSet.size()); + assertFalse(trieSet.contains("banana")); + } + + @Test + public void testToArray() { + trieSet.add("apple"); + trieSet.add("banana"); + trieSet.add("cherry"); + + Object[] array = trieSet.toArray(); + assertEquals(3, array.length); + Set arraySet = new HashSet<>(Arrays.asList((String[]) Arrays.copyOf(array, array.length, String[].class))); + assertTrue(arraySet.contains("apple")); + assertTrue(arraySet.contains("banana")); + assertTrue(arraySet.contains("cherry")); + } + + @Test + public void testToArrayTyped() { + trieSet.add("apple"); + trieSet.add("banana"); + + String[] array = trieSet.toArray(new String[0]); + assertEquals(2, array.length); + Set arraySet = new HashSet<>(Arrays.asList(array)); + assertTrue(arraySet.contains("apple")); + assertTrue(arraySet.contains("banana")); + } + + // ========== PREFIX QUERY TESTS ========== + + @Test + public void testPrefixSetEmpty() { + Set result = trieSet.prefixSet("app"); + assertTrue(result.isEmpty()); + } + + @Test + public void testPrefixSetExactMatch() { + trieSet.add("apple"); + Set result = trieSet.prefixSet("apple"); + assertEquals(1, result.size()); + assertTrue(result.contains("apple")); + } + + @Test + public void testPrefixSetMultipleMatches() { + trieSet.add("apple"); + trieSet.add("application"); + trieSet.add("apply"); + trieSet.add("ape"); + trieSet.add("banana"); + + Set result = trieSet.prefixSet("app"); + assertEquals(3, result.size()); + assertTrue(result.contains("apple")); + assertTrue(result.contains("application")); + assertTrue(result.contains("apply")); + assertFalse(result.contains("ape")); + assertFalse(result.contains("banana")); + } + + @Test + public void testPrefixSetSingleCharacter() { + trieSet.add("apple"); + trieSet.add("application"); + trieSet.add("ape"); + trieSet.add("banana"); + + Set result = trieSet.prefixSet("a"); + assertEquals(3, result.size()); + assertTrue(result.contains("apple")); + assertTrue(result.contains("application")); + assertTrue(result.contains("ape")); + assertFalse(result.contains("banana")); + } + + @Test + public void testPrefixSetNoMatches() { + trieSet.add("apple"); + trieSet.add("banana"); + + Set result = trieSet.prefixSet("cherry"); + assertTrue(result.isEmpty()); + } + + @Test + public void testPrefixSetEmptyPrefix() { + trieSet.add("apple"); + trieSet.add("banana"); + trieSet.add("cherry"); + + Set result = trieSet.prefixSet(""); + assertEquals(3, result.size()); + } + + @Test + public void testPrefixSetLongerThanAnyElement() { + trieSet.add("app"); + + Set result = trieSet.prefixSet("application"); + assertTrue(result.isEmpty()); + } + + @Test + public void testPrefixSetCommonPrefixes() { + trieSet.add("topic-foo-1"); + trieSet.add("topic-foo-2"); + trieSet.add("topic-foo-3"); + trieSet.add("topic-bar-1"); + trieSet.add("other-topic"); + + Set result = trieSet.prefixSet("topic-foo"); + assertEquals(3, result.size()); + assertTrue(result.contains("topic-foo-1")); + assertTrue(result.contains("topic-foo-2")); + assertTrue(result.contains("topic-foo-3")); + assertFalse(result.contains("topic-bar-1")); + assertFalse(result.contains("other-topic")); + } + + @Test + public void testPrefixSetNestedPrefixes() { + trieSet.add("a"); + trieSet.add("ab"); + trieSet.add("abc"); + trieSet.add("abcd"); + + Set result = trieSet.prefixSet("ab"); + assertEquals(3, result.size()); + assertTrue(result.contains("ab")); + assertTrue(result.contains("abc")); + assertTrue(result.contains("abcd")); + assertFalse(result.contains("a")); + } + + // ========== PERFORMANCE/STRESS TESTS ========== + + @Test + public void testLargeDataset() { + for (int i = 0; i < 1000; i++) { + trieSet.add("element-" + i); + } + assertEquals(1000, trieSet.size()); + + Set result = trieSet.prefixSet("element-1"); + // Should match: element-1, element-10, element-11, ..., element-19, element-100, ..., element-199 + assertTrue(result.size() > 1); + assertTrue(result.contains("element-1")); + assertTrue(result.contains("element-10")); + assertTrue(result.contains("element-100")); + } + + @Test + public void testPrefixSetAfterRemoval() { + trieSet.add("apple"); + trieSet.add("application"); + trieSet.add("apply"); + + Set result1 = trieSet.prefixSet("app"); + assertEquals(3, result1.size()); + + trieSet.remove("application"); + + Set result2 = trieSet.prefixSet("app"); + assertEquals(2, result2.size()); + assertTrue(result2.contains("apple")); + assertTrue(result2.contains("apply")); + assertFalse(result2.contains("application")); + } + + @Test + public void testRemoveAllFromPrefixSet() { + trieSet.add("topic-foo-1"); + trieSet.add("topic-foo-2"); + trieSet.add("topic-foo-3"); + trieSet.add("topic-bar-1"); + trieSet.add("other-topic"); + + // Get all with prefix "topic-foo" - need to copy to avoid ConcurrentModificationException + // since prefixSet() returns a view backed by the trie + Set prefixMatches = new HashSet<>(trieSet.prefixSet("topic-foo")); + assertEquals(3, prefixMatches.size()); + + // Remove them all + assertTrue(trieSet.removeAll(prefixMatches)); + assertEquals(2, trieSet.size()); + assertTrue(trieSet.contains("topic-bar-1")); + assertTrue(trieSet.contains("other-topic")); + assertFalse(trieSet.contains("topic-foo-1")); + assertFalse(trieSet.contains("topic-foo-2")); + assertFalse(trieSet.contains("topic-foo-3")); + } + + @Test + public void testPrefixSetWithSpecialCharacters() { + trieSet.add("user-123"); + trieSet.add("user-456"); + trieSet.add("user_admin"); + trieSet.add("user.guest"); + + Set result = trieSet.prefixSet("user-"); + assertEquals(2, result.size()); + assertTrue(result.contains("user-123")); + assertTrue(result.contains("user-456")); + } + + @Test + public void testPrefixSetCaseSensitive() { + trieSet.add("Apple"); + trieSet.add("application"); + trieSet.add("APPLY"); + + Set result1 = trieSet.prefixSet("app"); + assertEquals(1, result1.size()); + assertTrue(result1.contains("application")); + + Set result2 = trieSet.prefixSet("App"); + assertEquals(1, result2.size()); + assertTrue(result2.contains("Apple")); + + Set result3 = trieSet.prefixSet("APP"); + assertEquals(1, result3.size()); + assertTrue(result3.contains("APPLY")); + } +} From 57ee90d54dfa91b41e6bec57721088cc247d4657 Mon Sep 17 00:00:00 2001 From: Guang Zhao Date: Thu, 20 Nov 2025 07:18:07 +1100 Subject: [PATCH 5/7] Use AbstractPatriciaTrie.size for constant-time Set.isEmpty --- .../src/main/java/org/apache/kafka/common/utils/TrieSet.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java b/clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java index 4fdcb70a27b01..033bf84d665fc 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/TrieSet.java @@ -59,7 +59,7 @@ public int size() { @Override public boolean isEmpty() { - return trie.isEmpty(); + return size() == 0; } @Override From 6ca28b61b19d680e8c1c5dc336b51aa9c030ec22 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Sat, 22 Nov 2025 16:09:24 +0300 Subject: [PATCH 6/7] Add more test and fix bench error --- .../authorizer/BaseAuthorizerTest.scala | 20 +++++++++++++++++++ .../kafka/jmh/acl/AuthorizerBenchmark.java | 7 +++++++ 2 files changed, 27 insertions(+) diff --git a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala index c7726ff52454f..996d962fbe89f 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala @@ -141,6 +141,26 @@ trait BaseAuthorizerTest { "User1 from host1 should not have WRITE access to any topic") } + @Test + def testAuthorizeByResourceTypeLiteralResourceDenyDominate(): Unit = { + val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1") + val host1 = InetAddress.getByName("192.168.1.1") + val abcd = new ResourcePattern(GROUP, "abcd", PREFIXED) + val abcde = new ResourcePattern(GROUP, "abcde", LITERAL) + + val u1h1Context = newRequestContext(user1, host1) + val allowAce = new AccessControlEntry(user1.toString, host1.getHostAddress, READ, ALLOW) + val denyAce = new AccessControlEntry(user1.toString, host1.getHostAddress, READ, DENY) + + addAcls(authorizer, Set(allowAce), abcde) + assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.GROUP), + "User1 from host1 should have READ access to at least one group") + + addAcls(authorizer, Set(denyAce), abcd) + assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.GROUP), + "User1 from host1 now should not have READ access to any group") + } + @Test def testAuthorizeByResourceTypePrefixedResourceDenyDominate(): Unit = { val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1") diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java index 6d4536301f576..9e36ba69c7529 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.internals.PluginMetricsImpl; import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.ApiKeys; @@ -202,6 +204,11 @@ public void tearDown() throws IOException { authorizer.close(); } + @Setup(Level.Iteration) + public void setupIteration() { + authorizer.withPluginMetrics(new PluginMetricsImpl(new Metrics(), new HashMap<>(1000000))); + } + @Benchmark public Iterable testAclsIterator() { return authorizer.acls(filter); From 190055c4ddb18b19ac2ba21141364ebf45eb3eda Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Sun, 7 Dec 2025 17:50:03 +0300 Subject: [PATCH 7/7] Add bench on PatriciaTrie --- .../acl/AuthorizeByResourceTypeSearch.java | 210 ++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java new file mode 100644 index 0000000000000..1066fa82b0bea --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.acl; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.acl.*; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.internals.PluginMetricsImpl; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.metadata.authorizer.StandardAcl; +import org.apache.kafka.metadata.authorizer.StandardAuthorizer; +import org.apache.kafka.server.authorizer.AuthorizationResult; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * Benchmark (resourceNamePrefix) Mode Cnt Score Error Units + * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType AuthorizeByResourceTypeSearch- avgt 7 4.252 ± 0.024 us/op + * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType Authorize...Check- avgt 7 4.301 ± 0.030 us/op + * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType Authorize...Difference- avgt 7 4.592 ± 0.042 us/op + *

+ * + */ +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class AuthorizeByResourceTypeSearch { + + @Param({ + "AuthorizeByResourceTypeSearch-", + "AuthorizeByResourceTypeSearchOneMoreWordLongForDenyPatternCheck-", + "AuthorizeByResourceTypeSearchOneMoreWordLongForDenyPatternCheckAndWeAddOneMoreWordJustInCaseToShowBenchmarkDifference-", + }) + String resourceNamePrefix; + + @Param({"4","10","50"}) + int typeOfPrefixedAndLiteralPattern; + + private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); + private String authorizeByResourceTypeHostName = "127.0.0.2"; + private StandardAuthorizer authorizer; + private RequestContext authorizeByResourceTypeContext; + private AclBindingFilter filter; + private AclOperation op; + private ResourceType resourceType; + + @Setup(Level.Trial) + public void setup() throws Exception { + authorizer = new StandardAuthorizer(); + filter = AclBindingFilter.ANY; + op = AclOperation.READ; + resourceType = ResourceType.TOPIC; + prepareAclCache(); + authorizeByResourceTypeContext = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(), + "someclient", 1), "1", InetAddress.getByName(authorizeByResourceTypeHostName), principal, + ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false); + } + + + /** + * What we do in this test: + *

+ * For every Allow Literal Pattern -- iterate on count of typeOfPrefixedAndLiteralPattern + * check in denyPatternsLiteral map -- check on exists then no found + * check in denyPatternsPrefixed -- find in denyPatternsPrefixed prefix + * return if allow (in this test we don't find) + *

+ * for every Allow Prefix pattern + * check in denyPatternsPrefixed -- find in denyPatternsPrefixed prefix + * return allow + *

+ * return deny - never reach + *

+ * We iterate + * typeOfPrefixedAndLiteralPattern + 1 counts. + * Make (typeOfPrefixedAndLiteralPattern + 1) * 2 search in PatriciaTrie + */ + private void prepareAclCache() { + Map> aclEntries = new HashMap<>(); + + String prefix = "a"; + + List patterns = new ArrayList<>(); + for(int i = 0; i< typeOfPrefixedAndLiteralPattern; i++) { + patterns.add(resourceNamePrefix + prefix.repeat(i+1)); + } + + String allowPattern = resourceNamePrefix; + + for(String pattern : patterns) { + // PREFIX DENY + makeDeny(pattern, aclEntries, PatternType.PREFIXED); + // ALLOW LITERAL + makeAllow(pattern, aclEntries, PatternType.LITERAL); + } + + // Add one Allow + makeAllow(allowPattern, aclEntries, PatternType.PREFIXED); + + + // makeDeny(denyPattern1, aclEntries); + setupAcls(aclEntries); + } + + private void makeDeny(String denyPattern, Map> aclEntries, PatternType patternType) { + ResourcePattern resource = new ResourcePattern(ResourceType.TOPIC, denyPattern, + patternType); + + Set entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>()); + + AccessControlEntry denyAce = new AccessControlEntry(principal.toString(), authorizeByResourceTypeHostName, + AclOperation.READ, AclPermissionType.DENY); + + entries.add(denyAce); + } + + private void makeAllow(String denyPattern, Map> aclEntries, PatternType patternType) { + ResourcePattern resourceAllow = new ResourcePattern(ResourceType.TOPIC, denyPattern, + patternType); + + Set entriesAllow = aclEntries.computeIfAbsent(resourceAllow, k -> new HashSet<>()); + + AccessControlEntry allowAce = new AccessControlEntry(principal.toString(), authorizeByResourceTypeHostName, + AclOperation.READ, AclPermissionType.ALLOW); + + entriesAllow.add(allowAce); + } + + private void setupAcls(Map> aclEntries) { + for (Map.Entry> entryMap : aclEntries.entrySet()) { + ResourcePattern resourcePattern = entryMap.getKey(); + + for (AccessControlEntry accessControlEntry : entryMap.getValue()) { + StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, accessControlEntry)); + authorizer.addAcl(Uuid.randomUuid(), standardAcl); + } + authorizer.completeInitialLoad(); + + } + } + + @Setup(Level.Iteration) + public void setupIteration() { + authorizer.withPluginMetrics(new PluginMetricsImpl(new Metrics(), new HashMap<>(1000000))); + } + + @TearDown(Level.Trial) + public void tearDown() throws IOException { + authorizer.close(); + } + + @Benchmark + public AuthorizationResult testAuthorizeByResourceType() { + return authorizer.authorizeByResourceType(authorizeByResourceTypeContext, op, resourceType); + } + + public static void main(String[] args) { + Options opt = new OptionsBuilder() + .include(AuthorizeByResourceTypeSearch.class.getSimpleName()) + .warmupIterations(7) + .warmupTime(TimeValue.seconds(1)) + .measurementIterations(7) + .measurementTime(TimeValue.seconds(1)) + .timeUnit(TimeUnit.MICROSECONDS) + .forks(1) + .build(); + try { + new Runner(opt).run(); + } catch (RunnerException e) { + e.printStackTrace(); + } + } +}