Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.concurrent.CompletableFuture;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.servlet.AsyncContext;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -135,8 +137,16 @@ default AuthenticationState newHttpAuthState(HttpServletRequest request)
*/
default CompletableFuture<Boolean> authenticateHttpRequestAsync(HttpServletRequest request,
HttpServletResponse response) {
AsyncContext ctx = request.getAsyncContext();
try {
return CompletableFuture.completedFuture(this.authenticateHttpRequest(request, response));
final AuthenticationDataSource authDataSource = newHttpAuthState(request).getAuthDataSource();
return authenticateAsync(authDataSource)
.thenApply(role -> {
ServletRequest r = ctx.getRequest();
r.setAttribute(AuthenticatedRoleAttributeName, role);
r.setAttribute(AuthenticatedDataAttributeName, authDataSource);
return true;
});
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -102,64 +103,72 @@ private AuthenticationProvider getAuthProvider(String authMethodName) throws Aut
return providerToUse;
}

public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response)
throws Exception {
/**
*
* @param request
* @param response
* @return
*/
public CompletableFuture<Boolean> authenticateHttpRequestAsync(HttpServletRequest request,
HttpServletResponse response) {
String authMethodName = getAuthMethodName(request);
authMethodName = authMethodName == null ? "BasicAuthentication" : authMethodName;
if (authMethodName == null
&& SaslConstants.SASL_TYPE_VALUE.equalsIgnoreCase(request.getHeader(SaslConstants.SASL_HEADER_TYPE))) {
// This edge case must be handled because the Pulsar SASL implementation does not add the
// X-Pulsar-Auth-Method-Name header.
authMethodName = SaslConstants.AUTH_METHOD_NAME;
}
if (authMethodName != null) {
AuthenticationProvider providerToUse = getAuthProvider(authMethodName);
try {
return providerToUse.authenticateHttpRequest(request, response);
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication failed for provider " + providerToUse.getAuthMethodName() + " : "
+ e.getMessage(), e);
}
throw e;
// if (authMethodName != null) {
AuthenticationProvider providerToUse = providers.get(authMethodName);
if (providerToUse == null) {
return CompletableFuture.failedFuture(new AuthenticationException(
String.format("Unsupported authentication method: [%s].", authMethodName)));
}
} else {
for (AuthenticationProvider provider : providers.values()) {
try {
return provider.authenticateHttpRequest(request, response);
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": "
+ e.getMessage(), e);
}
// Ignore the exception because we don't know which authentication method is expected here.
}
}
// No authentication provided
if (!providers.isEmpty()) {
if (StringUtils.isNotBlank(anonymousUserRole)) {
request.setAttribute(AuthenticatedRoleAttributeName, anonymousUserRole);
request.setAttribute(AuthenticatedDataAttributeName, new AuthenticationDataHttps(request));
return true;
}
// If at least a provider was configured, then the authentication needs to be provider
throw new AuthenticationException("Authentication required");
} else {
// No authentication required
return true;
}
}
return providerToUse.authenticateHttpRequestAsync(request, response);
// todo how to handle exceptional case?
// } else {
// for (AuthenticationProvider provider : providers.values()) {
// try {
// return provider.authenticateHttpRequest(request, response);
// } catch (AuthenticationException e) {
// if (LOG.isDebugEnabled()) {
// LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": "
// + e.getMessage(), e);
// }
// // Ignore the exception because we don't know which authentication method is expected here.
// }
// }
// // No authentication provided
// if (!providers.isEmpty()) {
// if (StringUtils.isNotBlank(anonymousUserRole)) {
// request.setAttribute(AuthenticatedRoleAttributeName, anonymousUserRole);
// request.setAttribute(AuthenticatedDataAttributeName, new AuthenticationDataHttps(request));
// return true;
// }
// // If at least a provider was configured, then the authentication needs to be provider
// throw new AuthenticationException("Authentication required");
// } else {
// // No authentication required
// return true;
// }
// }
}

/**
* @deprecated use {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)}
* @deprecated use {@link #authenticateHttpRequestAsync(HttpServletRequest, HttpServletResponse)}
*/
@Deprecated(since = "2.12.0")
public String authenticateHttpRequest(HttpServletRequest request, AuthenticationDataSource authData)
throws AuthenticationException {
String authMethodName = getAuthMethodName(request);

if (authMethodName != null) {
AuthenticationProvider providerToUse = getAuthProvider(authMethodName);
AuthenticationProvider providerToUse = providers.get(authMethodName);
if (providerToUse == null) {
throw new AuthenticationException(
String.format("Unsupported authentication method: [%s].", authMethodName));
}
try {
if (authData == null) {
AuthenticationState authenticationState = providerToUse.newHttpAuthState(request);
Expand Down Expand Up @@ -205,7 +214,7 @@ public String authenticateHttpRequest(HttpServletRequest request, Authentication
/**
* Mark this function as deprecated, it is recommended to use a method with the AuthenticationDataSource
* signature to implement it.
* @deprecated use {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)}.
* @deprecated use {@link #authenticateHttpRequestAsync(HttpServletRequest, HttpServletResponse)}.
*/
@Deprecated
public String authenticateHttpRequest(HttpServletRequest request) throws AuthenticationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.io.IOException;
import javax.naming.AuthenticationException;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
Expand Down Expand Up @@ -52,20 +55,69 @@ public AuthenticationFilter(AuthenticationService authenticationService) {
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
try {
boolean doFilter = authenticationService
.authenticateHttpRequest((HttpServletRequest) request, (HttpServletResponse) response);
if (doFilter) {
chain.doFilter(request, response);
if (request.getAttribute(AuthenticatedRoleAttributeName) != null) {
chain.doFilter(request, response);
return;
}
AsyncContext asyncContext = request.startAsync();
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
try {
chain.doFilter(event.getSuppliedRequest(), event.getSuppliedResponse());
} catch (ServletException e) {
throw new RuntimeException(e);
}
}
} catch (Exception e) {
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required");
if (e instanceof AuthenticationException) {
LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage());
} else {
LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(), e);

@Override
public void onTimeout(AsyncEvent event) throws IOException {

}

@Override
public void onError(AsyncEvent event) throws IOException {

}

@Override
public void onStartAsync(AsyncEvent event) throws IOException {

}
});
authenticationService
.authenticateHttpRequestAsync((HttpServletRequest) request, (HttpServletResponse) response)
.whenComplete((doFilter, throwable) -> {
if (throwable != null) {
try {
HttpServletResponse httpResponse = (HttpServletResponse) asyncContext.getResponse();
httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required");
if (throwable instanceof AuthenticationException) {
LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(),
throwable.getMessage());
} else {
LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(),
throwable);
}
} catch (IOException e) {
LOG.error("Error while responding to HTTP request", e);
} finally {
asyncContext.complete();
}
} else {
asyncContext.getRequest().setAttribute("do_filter", doFilter);
asyncContext.complete();
}
});
}

private void runFilter(FilterChain chain, AsyncContext asyncContext) {
try {
chain.doFilter(asyncContext.getRequest(), asyncContext.getResponse());
} catch (IOException | ServletException e) {
LOG.error("Error in HTTP filtering", e);
} finally {
asyncContext.complete();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testAuthenticationHttpRequestResponse() throws Exception {
when(request.getRemoteAddr()).thenReturn("192.168.1.1");
when(request.getRemotePort()).thenReturn(8080);
when(request.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("auth");
boolean doFilter = service.authenticateHttpRequest(request, (HttpServletResponse) null);
boolean doFilter = service.authenticateHttpRequestAsync(request, (HttpServletResponse) null).get();
assertTrue(doFilter, "Authentication should have succeeded");
verify(request).setAttribute(AuthenticatedRoleAttributeName, s_authentication_success);
verify(request).setAttribute(eq(AuthenticatedDataAttributeName), any(AuthenticationDataHttps.class));
Expand All @@ -122,15 +122,15 @@ public void testAuthenticationHttpRequestResponseWithMultipleProviders() throws
when(requestDefaultAuthProvider.getRemoteAddr()).thenReturn("192.168.1.1");
when(requestDefaultAuthProvider.getRemotePort()).thenReturn(8080);
when(requestDefaultAuthProvider.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("auth");
doFilter = service.authenticateHttpRequest(requestDefaultAuthProvider, (HttpServletResponse) null);
doFilter = service.authenticateHttpRequestAsync(requestDefaultAuthProvider, (HttpServletResponse) null).get();
assertTrue(doFilter, "Authentication should have succeeded");
verify(requestDefaultAuthProvider).setAttribute(AuthenticatedRoleAttributeName, s_authentication_success);

HttpServletRequest requestCustomAuthProvider = mock(HttpServletRequest.class);
when(requestCustomAuthProvider.getRemoteAddr()).thenReturn("192.168.1.1");
when(requestCustomAuthProvider.getRemotePort()).thenReturn(8080);
when(requestCustomAuthProvider.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("customAuthProvider");
doFilter = service.authenticateHttpRequest(requestCustomAuthProvider, (HttpServletResponse) null);
doFilter = service.authenticateHttpRequestAsync(requestCustomAuthProvider, (HttpServletResponse) null).get();
assertTrue(doFilter, "Authentication should have succeeded");
verify(requestCustomAuthProvider).setAttribute(AuthenticatedRoleAttributeName, s_authentication_success);

Expand All @@ -139,7 +139,7 @@ public void testAuthenticationHttpRequestResponseWithMultipleProviders() throws
when(requestUnsupportedAuthProvider.getRemotePort()).thenReturn(8080);
when(requestUnsupportedAuthProvider.getHeader("X-Pulsar-Auth-Method-Name")).thenReturn("unsupportedAuthProvider");
Assert.assertThrows(() ->
service.authenticateHttpRequest(requestUnsupportedAuthProvider, (HttpServletResponse) null));
service.authenticateHttpRequestAsync(requestUnsupportedAuthProvider, (HttpServletResponse) null));

service.close();
}
Expand All @@ -159,7 +159,7 @@ public void testAuthenticationHttpRequestResponseWithAnonymousRole() throws Exce
HttpServletRequest requestCustomAuthProvider = mock(HttpServletRequest.class);
when(requestCustomAuthProvider.getRemoteAddr()).thenReturn("192.168.1.1");
when(requestCustomAuthProvider.getRemotePort()).thenReturn(8080);
doFilter = service.authenticateHttpRequest(requestCustomAuthProvider, (HttpServletResponse) null);
doFilter = service.authenticateHttpRequestAsync(requestCustomAuthProvider, (HttpServletResponse) null).get();
assertTrue(doFilter, "Authentication should have succeeded");
verify(requestCustomAuthProvider).setAttribute(AuthenticatedRoleAttributeName, anonRole);

Expand Down