From 58894b1f5ce028d77bbbed3f20f00d3f01da2d08 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Sat, 11 Oct 2025 19:34:16 +0300 Subject: [PATCH 01/11] KAFKA-19782 Add Patricia Trie to Authorizer --- LICENSE-binary | 1 + build.gradle | 1 + checkstyle/import-control.xml | 4 ++ .../kafka/server/authorizer/Authorizer.java | 63 +++++++++---------- gradle/dependencies.gradle | 2 + .../kafka/jmh/acl/AuthorizerBenchmark.java | 23 ++++++- 6 files changed, 60 insertions(+), 34 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 91c8865931ea2..73e77e3bfd86b 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -208,6 +208,7 @@ License Version 2.0: - caffeine-3.2.0 - commons-beanutils-1.11.0 - commons-collections-3.2.2 +- commons-collections4-4.5.0 - commons-digester-2.1 - commons-lang3-3.18.0 - commons-logging-1.3.5 diff --git a/build.gradle b/build.gradle index dc3bf215ec88f..061832069b5d9 100644 --- a/build.gradle +++ b/build.gradle @@ -1856,6 +1856,7 @@ project(':clients') { implementation libs.opentelemetryProto implementation libs.protobuf implementation libs.slf4jApi + implementation libs.commonsCollection // libraries which should be added as runtime dependencies in generated pom.xml should be defined here: shadowed libs.zstd diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index c7f9eaad7ea08..6b636092fde73 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -269,6 +269,10 @@ + + + + diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java index 68f1aaf678d1d..5510c32b71591 100644 --- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java +++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java @@ -17,6 +17,8 @@ package org.apache.kafka.server.authorizer; +import org.apache.commons.collections4.Trie; +import org.apache.commons.collections4.trie.PatriciaTrie; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.acl.AccessControlEntryFilter; @@ -33,7 +35,6 @@ import java.io.Closeable; import java.util.Collections; -import java.util.EnumMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -200,16 +201,14 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), AclBindingFilter aclFilter = new AclBindingFilter( resourceTypeFilter, AccessControlEntryFilter.ANY); - EnumMap> denyPatterns = - new EnumMap<>(PatternType.class) {{ - put(PatternType.LITERAL, new HashSet<>()); - put(PatternType.PREFIXED, new HashSet<>()); - }}; - EnumMap> allowPatterns = - new EnumMap<>(PatternType.class) {{ - put(PatternType.LITERAL, new HashSet<>()); - put(PatternType.PREFIXED, new HashSet<>()); - }}; + // stub for Patricia map + final Object val = new Object(); + + Set denyPatternsLiteral = new HashSet<>(); + Trie denyPatternsPrefixed = new PatriciaTrie<>(); + + Set allowPatternsLiteral = new HashSet<>(); + Map allowPatternsPrefixed = new PatriciaTrie<>(); boolean hasWildCardAllow = false; @@ -236,10 +235,10 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), // If wildcard deny exists, return deny directly if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) return AuthorizationResult.DENIED; - denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); + denyPatternsLiteral.add(binding.pattern().name()); break; case PREFIXED: - denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name()); + denyPatternsPrefixed.put(binding.pattern().name(), val); break; default: } @@ -255,10 +254,10 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), hasWildCardAllow = true; continue; } - allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); + allowPatternsLiteral.add(binding.pattern().name()); break; case PREFIXED: - allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name()); + allowPatternsPrefixed.put(binding.pattern().name(), val); break; default: } @@ -269,27 +268,25 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), } // For any literal allowed, if there's no dominant literal and prefix denied, return allow. - // For any prefix allowed, if there's no dominant prefix denied, return allow. - for (Map.Entry> entry : allowPatterns.entrySet()) { - for (String allowStr : entry.getValue()) { - if (entry.getKey() == PatternType.LITERAL - && denyPatterns.get(PatternType.LITERAL).contains(allowStr)) - continue; - StringBuilder sb = new StringBuilder(); - boolean hasDominatedDeny = false; - for (char ch : allowStr.toCharArray()) { - sb.append(ch); - if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) { - hasDominatedDeny = true; - break; - } - } - if (!hasDominatedDeny) - return AuthorizationResult.ALLOWED; + for (String allowStr : allowPatternsLiteral) { + if (denyPatternsLiteral.contains(allowStr)) { + continue; } + + boolean hasDominatedDeny = !denyPatternsPrefixed.prefixMap(allowStr).isEmpty(); + + if (!hasDominatedDeny) + return AuthorizationResult.ALLOWED; + } + + // For any prefix allowed, if there's no dominant prefix denied, return allow. + for (String allowStr : allowPatternsPrefixed.keySet()) { + boolean hasDominatedDeny = !denyPatternsPrefixed.prefixMap(allowStr).isEmpty(); + + if (!hasDominatedDeny) + return AuthorizationResult.ALLOWED; } return AuthorizationResult.DENIED; } - } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index b85dc87d78fac..51f13dfc106e6 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -57,6 +57,7 @@ versions += [ caffeine: "3.2.0", bndlib: "7.1.0", checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2", + commonsCollection: "4.5.0", commonsLang: "3.18.0", commonsValidator: "1.10.0", classgraph: "4.8.179", @@ -150,6 +151,7 @@ libs += [ bndlib:"biz.aQute.bnd:biz.aQute.bndlib:$versions.bndlib", caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", + commonsCollection: "org.apache.commons:commons-collections4:$versions.commonsCollection", commonsLang: "org.apache.commons:commons-lang3:$versions.commonsLang", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson", diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java index 6d4536301f576..f227f444e9d66 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java @@ -51,6 +51,11 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; import java.io.IOException; import java.net.InetAddress; @@ -71,7 +76,7 @@ @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AuthorizerBenchmark { - @Param({"10000", "50000", "200000"}) + @Param({"10000"}) private int resourceCount; //no. of. rules per resource @Param({"10", "50"}) @@ -216,4 +221,20 @@ public List testAuthorizer() { public AuthorizationResult testAuthorizeByResourceType() { return authorizer.authorizeByResourceType(authorizeByResourceTypeContext, op, resourceType); } + + public static void main(String[] args) { + Options opt = new OptionsBuilder() + .include(AuthorizerBenchmark.class.getSimpleName() + ".testAuthorizeByResourceType") + .warmupIterations(5) + .warmupTime(TimeValue.seconds(1)) + .measurementIterations(15) + .measurementTime(TimeValue.seconds(1)) + .forks(1) + .build(); + try { + new Runner(opt).run(); + } catch (RunnerException e) { + e.printStackTrace(); + } + } } From 281156df935052da271169741302af24fb9f71f7 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Tue, 18 Nov 2025 13:22:11 +0300 Subject: [PATCH 02/11] KAFKA-19782 Add Patricia Trie to Authorizer --- .../kafka/server/authorizer/Authorizer.java | 146 ++++++++++++++++-- .../authorizer/BaseAuthorizerTest.scala | 20 +++ .../kafka/jmh/acl/AuthorizerBenchmark.java | 26 ++-- 3 files changed, 160 insertions(+), 32 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java index 5510c32b71591..c57fe4fbb5486 100644 --- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java +++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java @@ -17,7 +17,6 @@ package org.apache.kafka.server.authorizer; -import org.apache.commons.collections4.Trie; import org.apache.commons.collections4.trie.PatriciaTrie; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.Endpoint; @@ -34,11 +33,7 @@ import org.apache.kafka.common.utils.SecurityUtils; import java.io.Closeable; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.CompletionStage; /** @@ -182,6 +177,117 @@ default int aclCount() { * to perform the given ACL operation on at least one resource of the * given type. Return {@link AuthorizationResult#DENIED} otherwise. */ + default AuthorizationResult authorizeByResourceType1(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + + // Check a hard-coded name to ensure that super users are granted + // access regardless of DENY ACLs. + if (authorize(requestContext, Collections.singletonList(new Action( + op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), + 0, true, false))) + .get(0) == AuthorizationResult.ALLOWED) { + return AuthorizationResult.ALLOWED; + } + + // Filter out all the resource pattern corresponding to the RequestContext, + // AclOperation, and ResourceType + ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( + resourceType, null, PatternType.ANY); + AclBindingFilter aclFilter = new AclBindingFilter( + resourceTypeFilter, AccessControlEntryFilter.ANY); + + EnumMap> denyPatterns = + new EnumMap<>(PatternType.class) {{ + put(PatternType.LITERAL, new HashSet<>()); + put(PatternType.PREFIXED, new HashSet<>()); + }}; + EnumMap> allowPatterns = + new EnumMap<>(PatternType.class) {{ + put(PatternType.LITERAL, new HashSet<>()); + put(PatternType.PREFIXED, new HashSet<>()); + }}; + + boolean hasWildCardAllow = false; + + KafkaPrincipal principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType(), + requestContext.principal().getName()); + String hostAddr = requestContext.clientAddress().getHostAddress(); + + for (AclBinding binding : acls(aclFilter)) { + if (!binding.entry().host().equals(hostAddr) && !binding.entry().host().equals("*")) + continue; + + if (!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal) + && !binding.entry().principal().equals("User:*")) + continue; + + if (binding.entry().operation() != op + && binding.entry().operation() != AclOperation.ALL) + continue; + + if (binding.entry().permissionType() == AclPermissionType.DENY) { + switch (binding.pattern().patternType()) { + case LITERAL: + // If wildcard deny exists, return deny directly + if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) + return AuthorizationResult.DENIED; + denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); + break; + case PREFIXED: + denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name()); + break; + default: + } + continue; + } + + if (binding.entry().permissionType() != AclPermissionType.ALLOW) + continue; + + switch (binding.pattern().patternType()) { + case LITERAL: + if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) { + hasWildCardAllow = true; + continue; + } + allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); + break; + case PREFIXED: + allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name()); + break; + default: + } + } + + if (hasWildCardAllow) { + return AuthorizationResult.ALLOWED; + } + + // For any literal allowed, if there's no dominant literal and prefix denied, return allow. + // For any prefix allowed, if there's no dominant prefix denied, return allow. + for (Map.Entry> entry : allowPatterns.entrySet()) { + for (String allowStr : entry.getValue()) { + if (entry.getKey() == PatternType.LITERAL + && denyPatterns.get(PatternType.LITERAL).contains(allowStr)) + continue; + StringBuilder sb = new StringBuilder(); + boolean hasDominatedDeny = false; + for (char ch : allowStr.toCharArray()) { + sb.append(ch); + if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) { + hasDominatedDeny = true; + break; + } + } + if (!hasDominatedDeny) + return AuthorizationResult.ALLOWED; + } + } + + return AuthorizationResult.DENIED; + } + default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); @@ -201,14 +307,11 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), AclBindingFilter aclFilter = new AclBindingFilter( resourceTypeFilter, AccessControlEntryFilter.ANY); - // stub for Patricia map - final Object val = new Object(); - Set denyPatternsLiteral = new HashSet<>(); - Trie denyPatternsPrefixed = new PatriciaTrie<>(); + PatriciaTrie denyPatternsPrefixed = new PatriciaTrie<>(); Set allowPatternsLiteral = new HashSet<>(); - Map allowPatternsPrefixed = new PatriciaTrie<>(); + PatriciaTrie allowPatternsPrefixed = new PatriciaTrie<>(); boolean hasWildCardAllow = false; @@ -238,7 +341,7 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), denyPatternsLiteral.add(binding.pattern().name()); break; case PREFIXED: - denyPatternsPrefixed.put(binding.pattern().name(), val); + denyPatternsPrefixed.put(binding.pattern().name(), Boolean.TRUE); break; default: } @@ -257,7 +360,7 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), allowPatternsLiteral.add(binding.pattern().name()); break; case PREFIXED: - allowPatternsPrefixed.put(binding.pattern().name(), val); + allowPatternsPrefixed.put(binding.pattern().name(), Boolean.TRUE); break; default: } @@ -273,7 +376,16 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), continue; } - boolean hasDominatedDeny = !denyPatternsPrefixed.prefixMap(allowStr).isEmpty(); + boolean hasDominatedDeny = false; + + if(!denyPatternsPrefixed.isEmpty()) { + hasDominatedDeny = !denyPatternsPrefixed.headMap(allowStr).isEmpty() || denyPatternsPrefixed.containsKey(allowStr); + } + + // String denyPrefix = denyPatternsPrefixed.selectKey(allowStr); + // boolean hasDominatedDeny = (denyPrefix != null); + + // boolean hasDominatedDeny = !denyPatternsPrefixed.prefixMap(allowStr).isEmpty(); if (!hasDominatedDeny) return AuthorizationResult.ALLOWED; @@ -281,7 +393,11 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), // For any prefix allowed, if there's no dominant prefix denied, return allow. for (String allowStr : allowPatternsPrefixed.keySet()) { - boolean hasDominatedDeny = !denyPatternsPrefixed.prefixMap(allowStr).isEmpty(); + boolean hasDominatedDeny = false; + + if(!denyPatternsPrefixed.isEmpty()) { + hasDominatedDeny = !denyPatternsPrefixed.headMap(allowStr).isEmpty() || denyPatternsPrefixed.containsKey(allowStr); + } if (!hasDominatedDeny) return AuthorizationResult.ALLOWED; diff --git a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala index c7726ff52454f..996d962fbe89f 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala @@ -141,6 +141,26 @@ trait BaseAuthorizerTest { "User1 from host1 should not have WRITE access to any topic") } + @Test + def testAuthorizeByResourceTypeLiteralResourceDenyDominate(): Unit = { + val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1") + val host1 = InetAddress.getByName("192.168.1.1") + val abcd = new ResourcePattern(GROUP, "abcd", PREFIXED) + val abcde = new ResourcePattern(GROUP, "abcde", LITERAL) + + val u1h1Context = newRequestContext(user1, host1) + val allowAce = new AccessControlEntry(user1.toString, host1.getHostAddress, READ, ALLOW) + val denyAce = new AccessControlEntry(user1.toString, host1.getHostAddress, READ, DENY) + + addAcls(authorizer, Set(allowAce), abcde) + assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.GROUP), + "User1 from host1 should have READ access to at least one group") + + addAcls(authorizer, Set(denyAce), abcd) + assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.GROUP), + "User1 from host1 now should not have READ access to any group") + } + @Test def testAuthorizeByResourceTypePrefixedResourceDenyDominate(): Unit = { val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1") diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java index f227f444e9d66..14ef2389e2211 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.internals.PluginMetricsImpl; import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.ApiKeys; @@ -51,6 +53,7 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.profile.GCProfiler; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; @@ -76,7 +79,7 @@ @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AuthorizerBenchmark { - @Param({"10000"}) + @Param({"10000", "50000"}) private int resourceCount; //no. of. rules per resource @Param({"10", "50"}) @@ -202,6 +205,11 @@ private Boolean shouldDeny() { return rand.nextDouble() * 100.0 - eps < denyPercentage; } + @Setup(Level.Iteration) + public void setupIteration() { + authorizer.withPluginMetrics(new PluginMetricsImpl(new Metrics(), new HashMap<>(1000000))); + } + @TearDown(Level.Trial) public void tearDown() throws IOException { authorizer.close(); @@ -221,20 +229,4 @@ public List testAuthorizer() { public AuthorizationResult testAuthorizeByResourceType() { return authorizer.authorizeByResourceType(authorizeByResourceTypeContext, op, resourceType); } - - public static void main(String[] args) { - Options opt = new OptionsBuilder() - .include(AuthorizerBenchmark.class.getSimpleName() + ".testAuthorizeByResourceType") - .warmupIterations(5) - .warmupTime(TimeValue.seconds(1)) - .measurementIterations(15) - .measurementTime(TimeValue.seconds(1)) - .forks(1) - .build(); - try { - new Runner(opt).run(); - } catch (RunnerException e) { - e.printStackTrace(); - } - } } From 718170eddd88d72dacea0655abce69b62050a428 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Tue, 18 Nov 2025 13:26:59 +0300 Subject: [PATCH 03/11] KAFKA-19782 Add Patricia Trie to Authorizer --- .../kafka/server/authorizer/Authorizer.java | 131 ------------------ 1 file changed, 131 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java index c57fe4fbb5486..9aed892ee942f 100644 --- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java +++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java @@ -157,137 +157,6 @@ default int aclCount() { return -1; } - /** - * Check if the caller is authorized to perform the given ACL operation on at least one - * resource of the given type. - * - * Custom authorizer implementations should consider overriding this default implementation because: - * 1. The default implementation iterates all AclBindings multiple times, without any caching - * by principal, host, operation, permission types, and resource types. More efficient - * implementations may be added in custom authorizers that directly access cached entries. - * 2. The default implementation cannot integrate with any audit logging included in the - * authorizer implementation. - * 3. The default implementation does not support any custom authorizer configs or other access - * rules apart from ACLs. - * - * @param requestContext Request context including request resourceType, security protocol and listener name - * @param op The ACL operation to check - * @param resourceType The resource type to check - * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized - * to perform the given ACL operation on at least one resource of the - * given type. Return {@link AuthorizationResult#DENIED} otherwise. - */ - default AuthorizationResult authorizeByResourceType1(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { - SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); - - // Check a hard-coded name to ensure that super users are granted - // access regardless of DENY ACLs. - if (authorize(requestContext, Collections.singletonList(new Action( - op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), - 0, true, false))) - .get(0) == AuthorizationResult.ALLOWED) { - return AuthorizationResult.ALLOWED; - } - - // Filter out all the resource pattern corresponding to the RequestContext, - // AclOperation, and ResourceType - ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( - resourceType, null, PatternType.ANY); - AclBindingFilter aclFilter = new AclBindingFilter( - resourceTypeFilter, AccessControlEntryFilter.ANY); - - EnumMap> denyPatterns = - new EnumMap<>(PatternType.class) {{ - put(PatternType.LITERAL, new HashSet<>()); - put(PatternType.PREFIXED, new HashSet<>()); - }}; - EnumMap> allowPatterns = - new EnumMap<>(PatternType.class) {{ - put(PatternType.LITERAL, new HashSet<>()); - put(PatternType.PREFIXED, new HashSet<>()); - }}; - - boolean hasWildCardAllow = false; - - KafkaPrincipal principal = new KafkaPrincipal( - requestContext.principal().getPrincipalType(), - requestContext.principal().getName()); - String hostAddr = requestContext.clientAddress().getHostAddress(); - - for (AclBinding binding : acls(aclFilter)) { - if (!binding.entry().host().equals(hostAddr) && !binding.entry().host().equals("*")) - continue; - - if (!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal) - && !binding.entry().principal().equals("User:*")) - continue; - - if (binding.entry().operation() != op - && binding.entry().operation() != AclOperation.ALL) - continue; - - if (binding.entry().permissionType() == AclPermissionType.DENY) { - switch (binding.pattern().patternType()) { - case LITERAL: - // If wildcard deny exists, return deny directly - if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) - return AuthorizationResult.DENIED; - denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); - break; - case PREFIXED: - denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name()); - break; - default: - } - continue; - } - - if (binding.entry().permissionType() != AclPermissionType.ALLOW) - continue; - - switch (binding.pattern().patternType()) { - case LITERAL: - if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) { - hasWildCardAllow = true; - continue; - } - allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); - break; - case PREFIXED: - allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name()); - break; - default: - } - } - - if (hasWildCardAllow) { - return AuthorizationResult.ALLOWED; - } - - // For any literal allowed, if there's no dominant literal and prefix denied, return allow. - // For any prefix allowed, if there's no dominant prefix denied, return allow. - for (Map.Entry> entry : allowPatterns.entrySet()) { - for (String allowStr : entry.getValue()) { - if (entry.getKey() == PatternType.LITERAL - && denyPatterns.get(PatternType.LITERAL).contains(allowStr)) - continue; - StringBuilder sb = new StringBuilder(); - boolean hasDominatedDeny = false; - for (char ch : allowStr.toCharArray()) { - sb.append(ch); - if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) { - hasDominatedDeny = true; - break; - } - } - if (!hasDominatedDeny) - return AuthorizationResult.ALLOWED; - } - } - - return AuthorizationResult.DENIED; - } - default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); From d65416eb8171fbe3755f4db7b76488e33f9e9b53 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Tue, 18 Nov 2025 13:26:59 +0300 Subject: [PATCH 04/11] KAFKA-19782 Add Patricia Trie to Authorizer --- .../kafka/server/authorizer/Authorizer.java | 136 ------------------ 1 file changed, 136 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java index c57fe4fbb5486..c77ebc9febc85 100644 --- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java +++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java @@ -157,137 +157,6 @@ default int aclCount() { return -1; } - /** - * Check if the caller is authorized to perform the given ACL operation on at least one - * resource of the given type. - * - * Custom authorizer implementations should consider overriding this default implementation because: - * 1. The default implementation iterates all AclBindings multiple times, without any caching - * by principal, host, operation, permission types, and resource types. More efficient - * implementations may be added in custom authorizers that directly access cached entries. - * 2. The default implementation cannot integrate with any audit logging included in the - * authorizer implementation. - * 3. The default implementation does not support any custom authorizer configs or other access - * rules apart from ACLs. - * - * @param requestContext Request context including request resourceType, security protocol and listener name - * @param op The ACL operation to check - * @param resourceType The resource type to check - * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized - * to perform the given ACL operation on at least one resource of the - * given type. Return {@link AuthorizationResult#DENIED} otherwise. - */ - default AuthorizationResult authorizeByResourceType1(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { - SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); - - // Check a hard-coded name to ensure that super users are granted - // access regardless of DENY ACLs. - if (authorize(requestContext, Collections.singletonList(new Action( - op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), - 0, true, false))) - .get(0) == AuthorizationResult.ALLOWED) { - return AuthorizationResult.ALLOWED; - } - - // Filter out all the resource pattern corresponding to the RequestContext, - // AclOperation, and ResourceType - ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( - resourceType, null, PatternType.ANY); - AclBindingFilter aclFilter = new AclBindingFilter( - resourceTypeFilter, AccessControlEntryFilter.ANY); - - EnumMap> denyPatterns = - new EnumMap<>(PatternType.class) {{ - put(PatternType.LITERAL, new HashSet<>()); - put(PatternType.PREFIXED, new HashSet<>()); - }}; - EnumMap> allowPatterns = - new EnumMap<>(PatternType.class) {{ - put(PatternType.LITERAL, new HashSet<>()); - put(PatternType.PREFIXED, new HashSet<>()); - }}; - - boolean hasWildCardAllow = false; - - KafkaPrincipal principal = new KafkaPrincipal( - requestContext.principal().getPrincipalType(), - requestContext.principal().getName()); - String hostAddr = requestContext.clientAddress().getHostAddress(); - - for (AclBinding binding : acls(aclFilter)) { - if (!binding.entry().host().equals(hostAddr) && !binding.entry().host().equals("*")) - continue; - - if (!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal) - && !binding.entry().principal().equals("User:*")) - continue; - - if (binding.entry().operation() != op - && binding.entry().operation() != AclOperation.ALL) - continue; - - if (binding.entry().permissionType() == AclPermissionType.DENY) { - switch (binding.pattern().patternType()) { - case LITERAL: - // If wildcard deny exists, return deny directly - if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) - return AuthorizationResult.DENIED; - denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); - break; - case PREFIXED: - denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name()); - break; - default: - } - continue; - } - - if (binding.entry().permissionType() != AclPermissionType.ALLOW) - continue; - - switch (binding.pattern().patternType()) { - case LITERAL: - if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) { - hasWildCardAllow = true; - continue; - } - allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); - break; - case PREFIXED: - allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name()); - break; - default: - } - } - - if (hasWildCardAllow) { - return AuthorizationResult.ALLOWED; - } - - // For any literal allowed, if there's no dominant literal and prefix denied, return allow. - // For any prefix allowed, if there's no dominant prefix denied, return allow. - for (Map.Entry> entry : allowPatterns.entrySet()) { - for (String allowStr : entry.getValue()) { - if (entry.getKey() == PatternType.LITERAL - && denyPatterns.get(PatternType.LITERAL).contains(allowStr)) - continue; - StringBuilder sb = new StringBuilder(); - boolean hasDominatedDeny = false; - for (char ch : allowStr.toCharArray()) { - sb.append(ch); - if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) { - hasDominatedDeny = true; - break; - } - } - if (!hasDominatedDeny) - return AuthorizationResult.ALLOWED; - } - } - - return AuthorizationResult.DENIED; - } - default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); @@ -381,11 +250,6 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), if(!denyPatternsPrefixed.isEmpty()) { hasDominatedDeny = !denyPatternsPrefixed.headMap(allowStr).isEmpty() || denyPatternsPrefixed.containsKey(allowStr); } - - // String denyPrefix = denyPatternsPrefixed.selectKey(allowStr); - // boolean hasDominatedDeny = (denyPrefix != null); - - // boolean hasDominatedDeny = !denyPatternsPrefixed.prefixMap(allowStr).isEmpty(); if (!hasDominatedDeny) return AuthorizationResult.ALLOWED; From fa35e6cf0f57c1930ee03f7971b370f42909f8ac Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Tue, 18 Nov 2025 13:29:16 +0300 Subject: [PATCH 05/11] KAFKA-19782 Add Patricia Trie to Authorizer add test --- .../kafka/server/authorizer/Authorizer.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java index c77ebc9febc85..36cf83221862e 100644 --- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java +++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java @@ -157,6 +157,26 @@ default int aclCount() { return -1; } + /** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * Custom authorizer implementations should consider overriding this default implementation because: + * 1. The default implementation iterates all AclBindings multiple times, without any caching + * by principal, host, operation, permission types, and resource types. More efficient + * implementations may be added in custom authorizers that directly access cached entries. + * 2. The default implementation cannot integrate with any audit logging included in the + * authorizer implementation. + * 3. The default implementation does not support any custom authorizer configs or other access + * rules apart from ACLs. + * + * @param requestContext Request context including request resourceType, security protocol and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized + * to perform the given ACL operation on at least one resource of the + * given type. Return {@link AuthorizationResult#DENIED} otherwise. + */ default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); @@ -251,6 +271,11 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), hasDominatedDeny = !denyPatternsPrefixed.headMap(allowStr).isEmpty() || denyPatternsPrefixed.containsKey(allowStr); } + // String denyPrefix = denyPatternsPrefixed.selectKey(allowStr); + // boolean hasDominatedDeny = (denyPrefix != null); + + // boolean hasDominatedDeny = !denyPatternsPrefixed.prefixMap(allowStr).isEmpty(); + if (!hasDominatedDeny) return AuthorizationResult.ALLOWED; } From d22a32f80fcb85588060ab6b8d37bce944ed3ca8 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Thu, 27 Nov 2025 00:05:31 +0300 Subject: [PATCH 06/11] Fix errors with finding prefix --- .../kafka/server/authorizer/Authorizer.java | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java index 36cf83221862e..67753f14b4f98 100644 --- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java +++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java @@ -267,15 +267,12 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), boolean hasDominatedDeny = false; - if(!denyPatternsPrefixed.isEmpty()) { - hasDominatedDeny = !denyPatternsPrefixed.headMap(allowStr).isEmpty() || denyPatternsPrefixed.containsKey(allowStr); + if (!denyPatternsPrefixed.isEmpty()) { + if (hasTail(denyPatternsPrefixed, allowStr) || hasHead(denyPatternsPrefixed, allowStr)) { + hasDominatedDeny = true; + } } - // String denyPrefix = denyPatternsPrefixed.selectKey(allowStr); - // boolean hasDominatedDeny = (denyPrefix != null); - - // boolean hasDominatedDeny = !denyPatternsPrefixed.prefixMap(allowStr).isEmpty(); - if (!hasDominatedDeny) return AuthorizationResult.ALLOWED; } @@ -284,14 +281,29 @@ op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), for (String allowStr : allowPatternsPrefixed.keySet()) { boolean hasDominatedDeny = false; - if(!denyPatternsPrefixed.isEmpty()) { - hasDominatedDeny = !denyPatternsPrefixed.headMap(allowStr).isEmpty() || denyPatternsPrefixed.containsKey(allowStr); + if (!denyPatternsPrefixed.isEmpty()) { + if (hasTail(denyPatternsPrefixed, allowStr) || hasHead(denyPatternsPrefixed, allowStr)) { + hasDominatedDeny = true; + } } if (!hasDominatedDeny) return AuthorizationResult.ALLOWED; } - return AuthorizationResult.DENIED; } + + private boolean hasTail(PatriciaTrie denyPatternsPrefixed, String allowStr) { + var t = denyPatternsPrefixed.tailMap(allowStr).entrySet().iterator(); + return t.hasNext() && t.next().getKey().equals(allowStr); + } + + private boolean hasHead(PatriciaTrie denyPatternsPrefixed, String allowStr) { + try { + String lastKey = denyPatternsPrefixed.headMap(allowStr).lastKey(); + return allowStr.startsWith(lastKey); + } catch (NoSuchElementException e) { + return false; + } + } } From b5b4f5214745f61a82f41dae0b60edb91f001184 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Sun, 7 Dec 2025 17:38:17 +0300 Subject: [PATCH 07/11] Add tests for PatriciaTrie --- .../acl/AuthorizeByResourceTypeSearch.java | 215 ++++++++++++++++++ .../kafka/jmh/acl/AuthorizerBenchmark.java | 8 +- 2 files changed, 216 insertions(+), 7 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java new file mode 100644 index 0000000000000..4bc58a3d88b85 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.acl; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.acl.*; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.internals.PluginMetricsImpl; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.metadata.authorizer.StandardAcl; +import org.apache.kafka.metadata.authorizer.StandardAuthorizer; +import org.apache.kafka.server.authorizer.AuthorizationResult; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * Benchmark (resourceNamePrefix) Mode Cnt Score Error Units + * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType AuthorizeByResourceTypeSearch- avgt 7 4.252 ± 0.024 us/op + * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType Authorize...Check- avgt 7 4.301 ± 0.030 us/op + * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType Authorize...Difference- avgt 7 4.592 ± 0.042 us/op + *

+ * + */ +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class AuthorizeByResourceTypeSearch { + + @Param({ + "AuthorizeByResourceTypeSearch-", + "AuthorizeByResourceTypeSearchOneMoreWordLongForDenyPatternCheck-", + "AuthorizeByResourceTypeSearchOneMoreWordLongForDenyPatternCheckAndWeAddOneMoreWordJustInCaseToShowBenchmarkDifference-", + }) + String resourceNamePrefix; + + @Param({"4","10"}) + int typeOfPrefixedAndLiteralPattern; + + private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); + private String authorizeByResourceTypeHostName = "127.0.0.2"; + private StandardAuthorizer authorizer; + private RequestContext authorizeByResourceTypeContext; + private AclBindingFilter filter; + private AclOperation op; + private ResourceType resourceType; + + @Setup(Level.Trial) + public void setup() throws Exception { + authorizer = new StandardAuthorizer(); + filter = AclBindingFilter.ANY; + op = AclOperation.READ; + resourceType = ResourceType.TOPIC; + prepareAclCache(); + authorizeByResourceTypeContext = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(), + "someclient", 1), "1", InetAddress.getByName(authorizeByResourceTypeHostName), principal, + ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false); + } + + + /** + * What we do in this test: + *

+ * For every Allow Literal Pattern -- iterate on count of typeOfPrefixedAndLiteralPattern + * check in denyPatternsLiteral map -- check on exists then no found + * check in denyPatternsPrefixed -- find in denyPatternsPrefixed prefix + * return if allow (in this test we don't find) + *

+ * for every Allow Prefix pattern + * check in denyPatternsPrefixed -- find in denyPatternsPrefixed prefix + * return allow + *

+ * return deny - never reach + *

+ * We iterate + * typeOfPrefixedAndLiteralPattern + 1 counts. + * Make (typeOfPrefixedAndLiteralPattern + 1) * 2 search in PatriciaTrie + */ + private void prepareAclCache() { + Map> aclEntries = new HashMap<>(); + + String prefix = "a"; + + List patterns = new ArrayList<>(); + for(int i = 0; i< typeOfPrefixedAndLiteralPattern; i++) { + patterns.add(resourceNamePrefix + prefix.repeat(i+1)); + } + + String allowPattern = resourceNamePrefix; + + for(String pattern : patterns) { + // PREFIX DENY + makeDeny(pattern, aclEntries, PatternType.PREFIXED); + // ALLOW LITERAL + makeAllow(pattern, aclEntries, PatternType.LITERAL); + } + + // Add one Allow + makeAllow(allowPattern, aclEntries, PatternType.PREFIXED); + + + // makeDeny(denyPattern1, aclEntries); + setupAcls(aclEntries); + } + + private void makeDeny(String denyPattern, Map> aclEntries, PatternType patternType) { + ResourcePattern resource = new ResourcePattern(ResourceType.TOPIC, denyPattern, + patternType); + + Set entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>()); + + AccessControlEntry denyAce = new AccessControlEntry(principal.toString(), authorizeByResourceTypeHostName, + AclOperation.READ, AclPermissionType.DENY); + + entries.add(denyAce); + } + + private void makeAllow(String denyPattern, Map> aclEntries, PatternType patternType) { + ResourcePattern resourceAllow = new ResourcePattern(ResourceType.TOPIC, denyPattern, + patternType); + + Set entriesAllow = aclEntries.computeIfAbsent(resourceAllow, k -> new HashSet<>()); + + AccessControlEntry allowAce = new AccessControlEntry(principal.toString(), authorizeByResourceTypeHostName, + AclOperation.READ, AclPermissionType.ALLOW); + + entriesAllow.add(allowAce); + } + + private void setupAcls(Map> aclEntries) { + for (Map.Entry> entryMap : aclEntries.entrySet()) { + ResourcePattern resourcePattern = entryMap.getKey(); + + for (AccessControlEntry accessControlEntry : entryMap.getValue()) { + StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, accessControlEntry)); + authorizer.addAcl(Uuid.randomUuid(), standardAcl); + } + authorizer.completeInitialLoad(); + + } + } + + @Setup(Level.Iteration) + public void setupIteration() { + authorizer.withPluginMetrics(new PluginMetricsImpl(new Metrics(), new HashMap<>(1000000))); + } + + @TearDown(Level.Trial) + public void tearDown() throws IOException { + authorizer.close(); + } + + @Benchmark + public AuthorizationResult testAuthorizeByResourceType() { + return authorizer.authorizeByResourceType(authorizeByResourceTypeContext, op, resourceType); + } + + @Benchmark + public AuthorizationResult testAuthorizeByResourceTypeOld() { + return authorizer.authorizeByResourceType1(authorizeByResourceTypeContext, op, resourceType); + } + + public static void main(String[] args) { + Options opt = new OptionsBuilder() + .include(AuthorizeByResourceTypeSearch.class.getSimpleName()) + .warmupIterations(7) + .warmupTime(TimeValue.seconds(1)) + .measurementIterations(7) + .measurementTime(TimeValue.seconds(1)) + .timeUnit(TimeUnit.MICROSECONDS) + .forks(1) + .build(); + try { + new Runner(opt).run(); + } catch (RunnerException e) { + e.printStackTrace(); + } + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java index 14ef2389e2211..dfa9d914c6bd5 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java @@ -53,12 +53,6 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; -import org.openjdk.jmh.profile.GCProfiler; -import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.RunnerException; -import org.openjdk.jmh.runner.options.Options; -import org.openjdk.jmh.runner.options.OptionsBuilder; -import org.openjdk.jmh.runner.options.TimeValue; import java.io.IOException; import java.net.InetAddress; @@ -79,7 +73,7 @@ @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AuthorizerBenchmark { - @Param({"10000", "50000"}) + @Param({"10000", "50000", "200000"}) private int resourceCount; //no. of. rules per resource @Param({"10", "50"}) From 18a66677cf8d1928692a66196ad4dca5d7ac8a4a Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Sun, 7 Dec 2025 17:38:17 +0300 Subject: [PATCH 08/11] Add tests for PatriciaTrie --- .../acl/AuthorizeByResourceTypeSearch.java | 210 ++++++++++++++++++ .../kafka/jmh/acl/AuthorizerBenchmark.java | 8 +- 2 files changed, 211 insertions(+), 7 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java new file mode 100644 index 0000000000000..bca27823ae6c0 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizeByResourceTypeSearch.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.acl; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.acl.*; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.internals.PluginMetricsImpl; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.metadata.authorizer.StandardAcl; +import org.apache.kafka.metadata.authorizer.StandardAuthorizer; +import org.apache.kafka.server.authorizer.AuthorizationResult; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * Benchmark (resourceNamePrefix) Mode Cnt Score Error Units + * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType AuthorizeByResourceTypeSearch- avgt 7 4.252 ± 0.024 us/op + * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType Authorize...Check- avgt 7 4.301 ± 0.030 us/op + * AuthorizeByResourceTypeSearch.testAuthorizeByResourceType Authorize...Difference- avgt 7 4.592 ± 0.042 us/op + *

+ * + */ +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class AuthorizeByResourceTypeSearch { + + @Param({ + "AuthorizeByResourceTypeSearch-", + "AuthorizeByResourceTypeSearchOneMoreWordLongForDenyPatternCheck-", + "AuthorizeByResourceTypeSearchOneMoreWordLongForDenyPatternCheckAndWeAddOneMoreWordJustInCaseToShowBenchmarkDifference-", + }) + String resourceNamePrefix; + + @Param({"4","10"}) + int typeOfPrefixedAndLiteralPattern; + + private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); + private String authorizeByResourceTypeHostName = "127.0.0.2"; + private StandardAuthorizer authorizer; + private RequestContext authorizeByResourceTypeContext; + private AclBindingFilter filter; + private AclOperation op; + private ResourceType resourceType; + + @Setup(Level.Trial) + public void setup() throws Exception { + authorizer = new StandardAuthorizer(); + filter = AclBindingFilter.ANY; + op = AclOperation.READ; + resourceType = ResourceType.TOPIC; + prepareAclCache(); + authorizeByResourceTypeContext = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(), + "someclient", 1), "1", InetAddress.getByName(authorizeByResourceTypeHostName), principal, + ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false); + } + + + /** + * What we do in this test: + *

+ * For every Allow Literal Pattern -- iterate on count of typeOfPrefixedAndLiteralPattern + * check in denyPatternsLiteral map -- check on exists then no found + * check in denyPatternsPrefixed -- find in denyPatternsPrefixed prefix + * return if allow (in this test we don't find) + *

+ * for every Allow Prefix pattern + * check in denyPatternsPrefixed -- find in denyPatternsPrefixed prefix + * return allow + *

+ * return deny - never reach + *

+ * We iterate + * typeOfPrefixedAndLiteralPattern + 1 counts. + * Make (typeOfPrefixedAndLiteralPattern + 1) * 2 search in PatriciaTrie + */ + private void prepareAclCache() { + Map> aclEntries = new HashMap<>(); + + String prefix = "a"; + + List patterns = new ArrayList<>(); + for(int i = 0; i< typeOfPrefixedAndLiteralPattern; i++) { + patterns.add(resourceNamePrefix + prefix.repeat(i+1)); + } + + String allowPattern = resourceNamePrefix; + + for(String pattern : patterns) { + // PREFIX DENY + makeDeny(pattern, aclEntries, PatternType.PREFIXED); + // ALLOW LITERAL + makeAllow(pattern, aclEntries, PatternType.LITERAL); + } + + // Add one Allow + makeAllow(allowPattern, aclEntries, PatternType.PREFIXED); + + + // makeDeny(denyPattern1, aclEntries); + setupAcls(aclEntries); + } + + private void makeDeny(String denyPattern, Map> aclEntries, PatternType patternType) { + ResourcePattern resource = new ResourcePattern(ResourceType.TOPIC, denyPattern, + patternType); + + Set entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>()); + + AccessControlEntry denyAce = new AccessControlEntry(principal.toString(), authorizeByResourceTypeHostName, + AclOperation.READ, AclPermissionType.DENY); + + entries.add(denyAce); + } + + private void makeAllow(String denyPattern, Map> aclEntries, PatternType patternType) { + ResourcePattern resourceAllow = new ResourcePattern(ResourceType.TOPIC, denyPattern, + patternType); + + Set entriesAllow = aclEntries.computeIfAbsent(resourceAllow, k -> new HashSet<>()); + + AccessControlEntry allowAce = new AccessControlEntry(principal.toString(), authorizeByResourceTypeHostName, + AclOperation.READ, AclPermissionType.ALLOW); + + entriesAllow.add(allowAce); + } + + private void setupAcls(Map> aclEntries) { + for (Map.Entry> entryMap : aclEntries.entrySet()) { + ResourcePattern resourcePattern = entryMap.getKey(); + + for (AccessControlEntry accessControlEntry : entryMap.getValue()) { + StandardAcl standardAcl = StandardAcl.fromAclBinding(new AclBinding(resourcePattern, accessControlEntry)); + authorizer.addAcl(Uuid.randomUuid(), standardAcl); + } + authorizer.completeInitialLoad(); + + } + } + + @Setup(Level.Iteration) + public void setupIteration() { + authorizer.withPluginMetrics(new PluginMetricsImpl(new Metrics(), new HashMap<>(1000000))); + } + + @TearDown(Level.Trial) + public void tearDown() throws IOException { + authorizer.close(); + } + + @Benchmark + public AuthorizationResult testAuthorizeByResourceType() { + return authorizer.authorizeByResourceType(authorizeByResourceTypeContext, op, resourceType); + } + + public static void main(String[] args) { + Options opt = new OptionsBuilder() + .include(AuthorizeByResourceTypeSearch.class.getSimpleName()) + .warmupIterations(7) + .warmupTime(TimeValue.seconds(1)) + .measurementIterations(7) + .measurementTime(TimeValue.seconds(1)) + .timeUnit(TimeUnit.MICROSECONDS) + .forks(1) + .build(); + try { + new Runner(opt).run(); + } catch (RunnerException e) { + e.printStackTrace(); + } + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java index 14ef2389e2211..dfa9d914c6bd5 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java @@ -53,12 +53,6 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; -import org.openjdk.jmh.profile.GCProfiler; -import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.RunnerException; -import org.openjdk.jmh.runner.options.Options; -import org.openjdk.jmh.runner.options.OptionsBuilder; -import org.openjdk.jmh.runner.options.TimeValue; import java.io.IOException; import java.net.InetAddress; @@ -79,7 +73,7 @@ @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AuthorizerBenchmark { - @Param({"10000", "50000"}) + @Param({"10000", "50000", "200000"}) private int resourceCount; //no. of. rules per resource @Param({"10", "50"}) From e56b4779a29dcc64234a35312c713b189994e76b Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Thu, 15 Jan 2026 19:51:54 +0300 Subject: [PATCH 09/11] Fix checkstyle errors --- .../java/org/apache/kafka/server/authorizer/Authorizer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java index 67753f14b4f98..284842e2dc496 100644 --- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java +++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java @@ -17,7 +17,6 @@ package org.apache.kafka.server.authorizer; -import org.apache.commons.collections4.trie.PatriciaTrie; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.acl.AccessControlEntryFilter; @@ -32,6 +31,8 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.commons.collections4.trie.PatriciaTrie; + import java.io.Closeable; import java.util.*; import java.util.concurrent.CompletionStage; From 66a63e76776a405bb19ccd61bd2f0d8acb3c0ab0 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Thu, 30 Apr 2026 14:44:31 +0300 Subject: [PATCH 10/11] Fix checkstyle errors --- .../org/apache/kafka/server/authorizer/Authorizer.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java index 284842e2dc496..fce976b5e3901 100644 --- a/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java +++ b/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java @@ -34,7 +34,12 @@ import org.apache.commons.collections4.trie.PatriciaTrie; import java.io.Closeable; -import java.util.*; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.CompletionStage; /** From 2109a6d898f952cd920c075aa4ba32ab16a1fb7f Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Sat, 27 Jun 2026 19:28:21 +0300 Subject: [PATCH 11/11] Move test --- .../security/authorizer/AuthorizerTest.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java b/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java index 926d185548da3..ee684ec610bb0 100644 --- a/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java +++ b/server/src/test/java/org/apache/kafka/security/authorizer/AuthorizerTest.java @@ -234,6 +234,29 @@ public void testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow() "User1 from host2 should have READ access to at least one topic"); } + @Test + public void testAuthorizeByResourceTypeLiteralResourceDenyDominate() throws Exception { + KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1"); + InetAddress host1 = InetAddress.getByName("192.168.1.1"); + + ResourcePattern abcd = new ResourcePattern(GROUP, "abcd", PREFIXED); + ResourcePattern abcde = new ResourcePattern(GROUP, "abcde", LITERAL); + + RequestContext u1h1Context = newRequestContext(user1, host1); + + AccessControlEntry allowAce = new AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, ALLOW); + AccessControlEntry denyAce = new AccessControlEntry(user1.toString(), host1.getHostAddress(), READ, DENY); + + addAcls(authorizer, Set.of(allowAce), abcde); + + assertTrue(authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.GROUP), + "User1 from host1 should have READ access to at least one group"); + + addAcls(authorizer, Set.of(denyAce), abcd); + assertFalse(authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.GROUP), + "User1 from host1 now should not have READ access to any group"); + } + @Test public void testAuthorizeByResourceTypeDenyTakesPrecedence() throws Exception { KafkaPrincipal user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1");