From f80cc11bea7133e4f239813d8ed05bb9a047e468 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Thu, 11 May 2023 00:36:40 -0500 Subject: [PATCH] [improve][ws] Use allowTopicOperationAsync for authz checks --- .../pulsar/websocket/ConsumerHandler.java | 20 +++++++++++++++++-- .../pulsar/websocket/ProducerHandler.java | 16 ++++++++++++++- .../pulsar/websocket/ReaderHandler.java | 20 +++++++++++++++++-- 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index c988fd1e70ce3..e326f2f02a901 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -19,6 +19,7 @@ package org.apache.pulsar.websocket; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.concurrent.TimeUnit.SECONDS; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Enums; import com.google.common.base.Splitter; @@ -33,6 +34,7 @@ import java.util.concurrent.atomic.LongAdder; import javax.servlet.http.HttpServletRequest; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; @@ -47,6 +49,7 @@ import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; +import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.websocket.data.ConsumerCommand; @@ -467,8 +470,21 @@ protected ConsumerBuilder getConsumerConfiguration(PulsarClient client) @Override protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception { - return service.getAuthorizationService().canConsume(topic, authRole, authenticationData, - this.subscription); + try { + AuthenticationDataSubscription subscription = new AuthenticationDataSubscription(authenticationData, + this.subscription); + return service.getAuthorizationService() + .allowTopicOperationAsync(topic, TopicOperation.CONSUME, authRole, subscription) + .get(service.getConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS); + } catch (InterruptedException e) { + log.warn("Time-out {} sec while checking authorization on {} ", + service.getConfig().getMetadataStoreOperationTimeoutSeconds(), topic); + throw e; + } catch (Exception e) { + log.warn("Consumer-client with Role - {} failed to get permissions for topic - {}. {}", authRole, topic, + e.getMessage()); + throw e; + } } public static String extractSubscription(HttpServletRequest request) { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java index 1dc3f202fe07d..f18d46e285617 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFromJSON; import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError; import static org.apache.pulsar.websocket.WebSocketError.UnknownError; @@ -45,6 +46,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.websocket.data.ProducerAck; @@ -242,7 +244,19 @@ public long getMsgPublishedCounter() { @Override protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception { - return service.getAuthorizationService().canProduce(topic, authRole, authenticationData); + try { + return service.getAuthorizationService() + .allowTopicOperationAsync(topic, TopicOperation.PRODUCE, authRole, authenticationData) + .get(service.getConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS); + } catch (InterruptedException e) { + log.warn("Time-out {} sec while checking authorization on {} ", + service.getConfig().getMetadataStoreOperationTimeoutSeconds(), topic); + throw e; + } catch (Exception e) { + log.warn("Producer-client with Role - {} failed to get permissions for topic - {}. {}", authRole, topic, + e.getMessage()); + throw e; + } } private void sendAckResponse(ProducerAck response) { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java index 890f426a9bbf3..3d726b9f02f76 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.websocket; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isNotBlank; import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; @@ -28,6 +29,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.MessageId; @@ -38,6 +40,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MultiTopicsReaderImpl; import org.apache.pulsar.client.impl.ReaderImpl; +import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.websocket.data.ConsumerCommand; import org.apache.pulsar.websocket.data.ConsumerMessage; @@ -310,8 +313,21 @@ protected void updateDeliverMsgStat(long msgSize) { @Override protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception { - return service.getAuthorizationService().canConsume(topic, authRole, authenticationData, - this.subscription); + try { + AuthenticationDataSubscription subscription = new AuthenticationDataSubscription(authenticationData, + this.subscription); + return service.getAuthorizationService() + .allowTopicOperationAsync(topic, TopicOperation.CONSUME, authRole, subscription) + .get(service.getConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS); + } catch (InterruptedException e) { + log.warn("Time-out {} sec while checking authorization on {} ", + service.getConfig().getMetadataStoreOperationTimeoutSeconds(), topic); + throw e; + } catch (Exception e) { + log.warn("Consumer-client with Role - {} failed to get permissions for topic - {}. {}", authRole, topic, + e.getMessage()); + throw e; + } } private int getReceiverQueueSize() {