diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java index f51e96ca9a779..77617d3fd20ee 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java @@ -26,6 +26,8 @@ import java.util.concurrent.CompletableFuture; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; +import javax.servlet.AsyncContext; +import javax.servlet.ServletRequest; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.pulsar.broker.ServiceConfiguration; @@ -135,8 +137,16 @@ default AuthenticationState newHttpAuthState(HttpServletRequest request) */ default CompletableFuture authenticateHttpRequestAsync(HttpServletRequest request, HttpServletResponse response) { + AsyncContext ctx = request.getAsyncContext(); try { - return CompletableFuture.completedFuture(this.authenticateHttpRequest(request, response)); + final AuthenticationDataSource authDataSource = newHttpAuthState(request).getAuthDataSource(); + return authenticateAsync(authDataSource) + .thenApply(role -> { + ServletRequest r = ctx.getRequest(); + r.setAttribute(AuthenticatedRoleAttributeName, role); + r.setAttribute(AuthenticatedDataAttributeName, authDataSource); + return true; + }); } catch (Exception e) { return FutureUtil.failedFuture(e); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java index 5f92453238f4d..0a077ca6c1887 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.servlet.http.HttpServletRequest; @@ -102,56 +103,60 @@ private AuthenticationProvider getAuthProvider(String authMethodName) throws Aut return providerToUse; } - public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) - throws Exception { + /** + * + * @param request + * @param response + * @return + */ + public CompletableFuture authenticateHttpRequestAsync(HttpServletRequest request, + HttpServletResponse response) { String authMethodName = getAuthMethodName(request); + authMethodName = authMethodName == null ? "BasicAuthentication" : authMethodName; if (authMethodName == null && SaslConstants.SASL_TYPE_VALUE.equalsIgnoreCase(request.getHeader(SaslConstants.SASL_HEADER_TYPE))) { // This edge case must be handled because the Pulsar SASL implementation does not add the // X-Pulsar-Auth-Method-Name header. authMethodName = SaslConstants.AUTH_METHOD_NAME; } - if (authMethodName != null) { - AuthenticationProvider providerToUse = getAuthProvider(authMethodName); - try { - return providerToUse.authenticateHttpRequest(request, response); - } catch (AuthenticationException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Authentication failed for provider " + providerToUse.getAuthMethodName() + " : " - + e.getMessage(), e); - } - throw e; +// if (authMethodName != null) { + AuthenticationProvider providerToUse = providers.get(authMethodName); + if (providerToUse == null) { + return CompletableFuture.failedFuture(new AuthenticationException( + String.format("Unsupported authentication method: [%s].", authMethodName))); } - } else { - for (AuthenticationProvider provider : providers.values()) { - try { - return provider.authenticateHttpRequest(request, response); - } catch (AuthenticationException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": " - + e.getMessage(), e); - } - // Ignore the exception because we don't know which authentication method is expected here. - } - } - // No authentication provided - if (!providers.isEmpty()) { - if (StringUtils.isNotBlank(anonymousUserRole)) { - request.setAttribute(AuthenticatedRoleAttributeName, anonymousUserRole); - request.setAttribute(AuthenticatedDataAttributeName, new AuthenticationDataHttps(request)); - return true; - } - // If at least a provider was configured, then the authentication needs to be provider - throw new AuthenticationException("Authentication required"); - } else { - // No authentication required - return true; - } - } + return providerToUse.authenticateHttpRequestAsync(request, response); + // todo how to handle exceptional case? +// } else { +// for (AuthenticationProvider provider : providers.values()) { +// try { +// return provider.authenticateHttpRequest(request, response); +// } catch (AuthenticationException e) { +// if (LOG.isDebugEnabled()) { +// LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": " +// + e.getMessage(), e); +// } +// // Ignore the exception because we don't know which authentication method is expected here. +// } +// } +// // No authentication provided +// if (!providers.isEmpty()) { +// if (StringUtils.isNotBlank(anonymousUserRole)) { +// request.setAttribute(AuthenticatedRoleAttributeName, anonymousUserRole); +// request.setAttribute(AuthenticatedDataAttributeName, new AuthenticationDataHttps(request)); +// return true; +// } +// // If at least a provider was configured, then the authentication needs to be provider +// throw new AuthenticationException("Authentication required"); +// } else { +// // No authentication required +// return true; +// } +// } } /** - * @deprecated use {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)} + * @deprecated use {@link #authenticateHttpRequestAsync(HttpServletRequest, HttpServletResponse)} */ @Deprecated(since = "2.12.0") public String authenticateHttpRequest(HttpServletRequest request, AuthenticationDataSource authData) @@ -159,7 +164,11 @@ public String authenticateHttpRequest(HttpServletRequest request, Authentication String authMethodName = getAuthMethodName(request); if (authMethodName != null) { - AuthenticationProvider providerToUse = getAuthProvider(authMethodName); + AuthenticationProvider providerToUse = providers.get(authMethodName); + if (providerToUse == null) { + throw new AuthenticationException( + String.format("Unsupported authentication method: [%s].", authMethodName)); + } try { if (authData == null) { AuthenticationState authenticationState = providerToUse.newHttpAuthState(request); @@ -205,7 +214,7 @@ public String authenticateHttpRequest(HttpServletRequest request, Authentication /** * Mark this function as deprecated, it is recommended to use a method with the AuthenticationDataSource * signature to implement it. - * @deprecated use {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)}. + * @deprecated use {@link #authenticateHttpRequestAsync(HttpServletRequest, HttpServletResponse)}. */ @Deprecated public String authenticateHttpRequest(HttpServletRequest request) throws AuthenticationException { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java index 6f13185ca7540..49815b6e3c19a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java @@ -20,6 +20,9 @@ import java.io.IOException; import javax.naming.AuthenticationException; +import javax.servlet.AsyncContext; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -52,20 +55,69 @@ public AuthenticationFilter(AuthenticationService authenticationService) { @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { - try { - boolean doFilter = authenticationService - .authenticateHttpRequest((HttpServletRequest) request, (HttpServletResponse) response); - if (doFilter) { - chain.doFilter(request, response); + if (request.getAttribute(AuthenticatedRoleAttributeName) != null) { + chain.doFilter(request, response); + return; + } + AsyncContext asyncContext = request.startAsync(); + asyncContext.addListener(new AsyncListener() { + @Override + public void onComplete(AsyncEvent event) throws IOException { + try { + chain.doFilter(event.getSuppliedRequest(), event.getSuppliedResponse()); + } catch (ServletException e) { + throw new RuntimeException(e); + } } - } catch (Exception e) { - HttpServletResponse httpResponse = (HttpServletResponse) response; - httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required"); - if (e instanceof AuthenticationException) { - LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage()); - } else { - LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(), e); + + @Override + public void onTimeout(AsyncEvent event) throws IOException { + + } + + @Override + public void onError(AsyncEvent event) throws IOException { + } + + @Override + public void onStartAsync(AsyncEvent event) throws IOException { + + } + }); + authenticationService + .authenticateHttpRequestAsync((HttpServletRequest) request, (HttpServletResponse) response) + .whenComplete((doFilter, throwable) -> { + if (throwable != null) { + try { + HttpServletResponse httpResponse = (HttpServletResponse) asyncContext.getResponse(); + httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required"); + if (throwable instanceof AuthenticationException) { + LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), + throwable.getMessage()); + } else { + LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(), + throwable); + } + } catch (IOException e) { + LOG.error("Error while responding to HTTP request", e); + } finally { + asyncContext.complete(); + } + } else { + asyncContext.getRequest().setAttribute("do_filter", doFilter); + asyncContext.complete(); + } + }); + } + + private void runFilter(FilterChain chain, AsyncContext asyncContext) { + try { + chain.doFilter(asyncContext.getRequest(), asyncContext.getResponse()); + } catch (IOException | ServletException e) { + LOG.error("Error in HTTP filtering", e); + } finally { + asyncContext.complete(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java index 78ae046b0c8c8..f581f2fcdacf3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthenticationServiceTest.java @@ -102,7 +102,7 @@ public void testAuthenticationHttpRequestResponse() throws Exception { when(request.getRemoteAddr()).thenReturn("192.168.1.1"); when(request.getRemotePort()).thenReturn(8080); when(request.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("auth"); - boolean doFilter = service.authenticateHttpRequest(request, (HttpServletResponse) null); + boolean doFilter = service.authenticateHttpRequestAsync(request, (HttpServletResponse) null).get(); assertTrue(doFilter, "Authentication should have succeeded"); verify(request).setAttribute(AuthenticatedRoleAttributeName, s_authentication_success); verify(request).setAttribute(eq(AuthenticatedDataAttributeName), any(AuthenticationDataHttps.class)); @@ -122,7 +122,7 @@ public void testAuthenticationHttpRequestResponseWithMultipleProviders() throws when(requestDefaultAuthProvider.getRemoteAddr()).thenReturn("192.168.1.1"); when(requestDefaultAuthProvider.getRemotePort()).thenReturn(8080); when(requestDefaultAuthProvider.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("auth"); - doFilter = service.authenticateHttpRequest(requestDefaultAuthProvider, (HttpServletResponse) null); + doFilter = service.authenticateHttpRequestAsync(requestDefaultAuthProvider, (HttpServletResponse) null).get(); assertTrue(doFilter, "Authentication should have succeeded"); verify(requestDefaultAuthProvider).setAttribute(AuthenticatedRoleAttributeName, s_authentication_success); @@ -130,7 +130,7 @@ public void testAuthenticationHttpRequestResponseWithMultipleProviders() throws when(requestCustomAuthProvider.getRemoteAddr()).thenReturn("192.168.1.1"); when(requestCustomAuthProvider.getRemotePort()).thenReturn(8080); when(requestCustomAuthProvider.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("customAuthProvider"); - doFilter = service.authenticateHttpRequest(requestCustomAuthProvider, (HttpServletResponse) null); + doFilter = service.authenticateHttpRequestAsync(requestCustomAuthProvider, (HttpServletResponse) null).get(); assertTrue(doFilter, "Authentication should have succeeded"); verify(requestCustomAuthProvider).setAttribute(AuthenticatedRoleAttributeName, s_authentication_success); @@ -139,7 +139,7 @@ public void testAuthenticationHttpRequestResponseWithMultipleProviders() throws when(requestUnsupportedAuthProvider.getRemotePort()).thenReturn(8080); when(requestUnsupportedAuthProvider.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("unsupportedAuthProvider"); Assert.assertThrows(() -> - service.authenticateHttpRequest(requestUnsupportedAuthProvider, (HttpServletResponse) null)); + service.authenticateHttpRequestAsync(requestUnsupportedAuthProvider, (HttpServletResponse) null)); service.close(); } @@ -159,7 +159,7 @@ public void testAuthenticationHttpRequestResponseWithAnonymousRole() throws Exce HttpServletRequest requestCustomAuthProvider = mock(HttpServletRequest.class); when(requestCustomAuthProvider.getRemoteAddr()).thenReturn("192.168.1.1"); when(requestCustomAuthProvider.getRemotePort()).thenReturn(8080); - doFilter = service.authenticateHttpRequest(requestCustomAuthProvider, (HttpServletResponse) null); + doFilter = service.authenticateHttpRequestAsync(requestCustomAuthProvider, (HttpServletResponse) null).get(); assertTrue(doFilter, "Authentication should have succeeded"); verify(requestCustomAuthProvider).setAttribute(AuthenticatedRoleAttributeName, anonRole);