bucketConfigurationSupplier;
+ private final MessageIdRegistry messageIdRegistry;
+
+ /**
+ * Intercepts methods annotated with {@link WithUserContext}. Retrieves the currently
+ * authenticated user from the {@link SecurityContextHolder} and stores the username in
+ * {@link UserContext}. After the method execution, the user context is cleared.
+ *
+ * @param jp the proceeding join point for the intercepted method
+ * @return the result of the method invocation
+ * @throws UnauthorizedException if no authentication is found, the user is not authenticated,
+ * or the principal is the anonymous user
+ * @throws Throwable if the intercepted method throws an exception
+ */
+ @Around("@annotation(io.ylab.chat.aop.annotation.WithUserContext)")
+ public Object withUserContext(ProceedingJoinPoint jp) throws Throwable {
+ Authentication auth = SecurityContextHolder.getContext().getAuthentication();
+ if (auth == null || !auth.isAuthenticated() || "anonymousUser".equals(
+ auth.getPrincipal())) {
+ throw new UnauthorizedException("User not authenticated");
+ }
+
+ String username = auth.getName();
+ UserContext.setUserInfo(UserInfo.builder().username(username).build());
+
+ try {
+ return jp.proceed();
+ } finally {
+ UserContext.clear();
+ log.debug("User context cleared for {}", username);
+ }
+ }
+
+ /**
+ * Intercepts methods annotated with {@link Idempotent}.
+ *
+ * Ensures that the same message (identified by {@code messageId}) is processed only once. If
+ * the first argument of the intercepted method is a {@link ChatMessageDto}, its message ID is
+ * validated. A missing or empty message ID is generated automatically. The
+ * {@link MessageIdRegistry} is consulted to check if the message has already been processed.
+ *
+ * @param jp the proceeding join point for the intercepted method
+ * @return the result of the method invocation, or {@code null} if the message is a duplicate
+ * @throws Throwable if the intercepted method throws an exception
+ */
+ @Around("@annotation(io.ylab.chat.aop.annotation.Idempotent)")
+ public Object idempotent(ProceedingJoinPoint jp) throws Throwable {
+ Object[] args = jp.getArgs();
+ if (args.length == 0 || !(args[0] instanceof ChatMessageDto dto)) {
+ return jp.proceed();
+ }
+
+ String messageId = dto.getMessageId();
+ if (messageId == null || messageId.trim().isEmpty()) {
+ messageId = UUID.randomUUID().toString();
+ dto.setMessageId(messageId);
+ }
+
+ if (!messageIdRegistry.markAsProcessed(messageId)) {
+ log.warn("Duplicate message ignored: {}", messageId);
+ return null;
+ }
+ return jp.proceed();
+ }
+
+ /**
+ * Intercepts methods annotated with {@link RateLimited}. Applies rate limiting per user based
+ * on a token bucket algorithm. The bucket key is derived from the current username (or
+ * "anonymous" if none is set). If a token is successfully consumed, the method proceeds;
+ * otherwise, a {@link ResponseStatusException} with HTTP 429 (Too Many Requests) is thrown.
+ *
+ * @param jp the proceeding join point for the intercepted method
+ * @return the result of the method invocation if rate limit is not exceeded
+ * @throws ResponseStatusException with {@code TOO_MANY_REQUESTS} when the rate limit is
+ * exceeded, including the recommended retry-after interval
+ * @throws Throwable if the intercepted method throws an exception
+ */
+ @Around("@annotation(io.ylab.chat.aop.annotation.RateLimited)")
+ public Object rateLimited(ProceedingJoinPoint jp) throws Throwable {
+ String username = UserContext.getCurrentUsername();
+ if (username == null) {
+ username = "anonymous";
+ }
+
+ String bucketKey = "rate_limit:chat:message:" + username;
+ byte[] keyBytes = bucketKey.getBytes(StandardCharsets.UTF_8);
+
+ Bucket bucket = bucketProxyManager.builder()
+ .build(keyBytes, bucketConfigurationSupplier.get());
+
+ ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
+
+ if (probe.isConsumed()) {
+ log.debug("Allowed for {}, remaining: {}", username, probe.getRemainingTokens());
+ return jp.proceed();
+ }
+
+ long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(probe.getNanosToWaitForRefill());
+
+ log.warn("Rate limit exceeded for {}, retry after {}s", username, waitSeconds);
+
+ throw new ResponseStatusException(
+ HttpStatus.TOO_MANY_REQUESTS,
+ "Too many messages. Retry after " + waitSeconds + " seconds",
+ new RateLimitExceededException("Rate limit exceeded")
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/ylab/chat/aop/DegradationAspect.java b/src/main/java/io/ylab/chat/aop/DegradationAspect.java
new file mode 100644
index 0000000..43e721e
--- /dev/null
+++ b/src/main/java/io/ylab/chat/aop/DegradationAspect.java
@@ -0,0 +1,124 @@
+package io.ylab.chat.aop;
+
+import lombok.extern.slf4j.Slf4j;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.springframework.core.annotation.Order;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Aspect for handling system degradation in case of repeated failures in message persistence.
+ */
+@Slf4j
+@Aspect
+@Order(4)
+@Component
+public class DegradationAspect {
+
+ /**
+ * Flag indicating whether the system is in degraded mode.
+ */
+ private final AtomicBoolean degradedMode = new AtomicBoolean(false);
+
+ /**
+ * Counter for tracking consecutive failures in message persistence.
+ */
+ private final AtomicInteger failureCount = new AtomicInteger(0);
+
+ /**
+ * The threshold for consecutive failures before entering degraded mode.
+ */
+ private static final int FAILURE_THRESHOLD = 5;
+
+ /**
+ * Around advice for handling failures in the `saveAsync` method of
+ * {@code MessagePersistenceService}.
+ *
+ * @param joinPoint the {@link ProceedingJoinPoint} representing the intercepted method
+ * @return the result of the method execution, or {@code null} if persistence is skipped
+ * @throws Throwable if an error occurs during method execution
+ */
+ @Around("execution(* io.ylab.chat.service.MessagePersistenceService.saveAsync(..))")
+ public Object handlePersistenceFailure(ProceedingJoinPoint joinPoint) throws Throwable {
+ if (degradedMode.get()) {
+ log.warn("System in degraded mode - skipping persistence");
+ return null;
+ }
+ try {
+ Object result = joinPoint.proceed();
+
+ if (failureCount.get() > 0) {
+ failureCount.set(0);
+ log.info("Persistence recovered - failure count reset");
+ }
+
+ return result;
+
+ } catch (RejectedExecutionException e) {
+ handleFailure("Thread pool exhausted", e);
+ return null;
+
+ } catch (Exception e) {
+ handleFailure("Database persistence failed", e);
+ return null;
+ }
+ }
+
+ @Scheduled(fixedDelay = 60000)
+ public void attemptRecovery() {
+ if (degradedMode.get()) {
+ try {
+ if (checkDatabaseConnection()) {
+ recover();
+ log.info("Automatic recovery from degraded mode succeeded");
+ }
+ } catch (Exception e) {
+ log.debug("Recovery check failed, staying in degraded mode");
+ }
+ }
+ }
+
+ private boolean checkDatabaseConnection() {
+ return true;
+ }
+
+ /**
+ * Handles a failure in message persistence.
+ *
+ * @param reason the reason for the failure
+ * @param e the exception that caused the failure
+ */
+ private void handleFailure(String reason, Exception e) {
+ int failures = failureCount.incrementAndGet();
+
+ log.error("{} (failure {}/{}): {}", reason, failures, FAILURE_THRESHOLD, e.getMessage());
+
+ if (failures >= FAILURE_THRESHOLD && !degradedMode.get()) {
+ degradedMode.set(true);
+ log.error("ENTERING DEGRADED MODE - Chat continues without persistence");
+ }
+ }
+
+ /**
+ * Checks if the system is currently in degraded mode.
+ *
+ * @return {@code true} if the system is in degraded mode, {@code false} otherwise
+ */
+ public boolean isDegraded() {
+ return degradedMode.get();
+ }
+
+ /**
+ * Recovers the system from degraded mode.
+ */
+ public void recover() {
+ degradedMode.set(false);
+ failureCount.set(0);
+ log.info("System recovered from degraded mode");
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/ylab/chat/aop/annotation/Idempotent.java b/src/main/java/io/ylab/chat/aop/annotation/Idempotent.java
new file mode 100644
index 0000000..bc89dea
--- /dev/null
+++ b/src/main/java/io/ylab/chat/aop/annotation/Idempotent.java
@@ -0,0 +1,24 @@
+package io.ylab.chat.aop.annotation;
+
+import io.ylab.chat.aop.ChatGuardAspect;
+import io.ylab.chat.concurrency.MessageIdRegistry;
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Indicates that a method should be idempotent with respect to message processing. When this
+ * annotation is applied, the {@link ChatGuardAspect} ensures that the same message (identified by
+ * its {@code messageId}) is processed only once. If the method's first parameter is a
+ * {@code ChatMessageDto}, the aspect automatically generates a {@code messageId} if missing and
+ * uses {@link MessageIdRegistry} to detect duplicates. Duplicate messages are silently ignored (the
+ * method is not invoked).
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface Idempotent {
+
+}
\ No newline at end of file
diff --git a/src/main/java/io/ylab/chat/aop/annotation/RateLimited.java b/src/main/java/io/ylab/chat/aop/annotation/RateLimited.java
new file mode 100644
index 0000000..8e33be8
--- /dev/null
+++ b/src/main/java/io/ylab/chat/aop/annotation/RateLimited.java
@@ -0,0 +1,22 @@
+package io.ylab.chat.aop.annotation;
+
+import io.ylab.chat.aop.ChatGuardAspect;
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Applies rate limiting to the annotated method. Methods marked with this annotation are protected
+ * by a token bucket rate limiter, managed by the {@link ChatGuardAspect}. The rate limit is applied
+ * per user (based on {@code UserContext.getCurrentUsername()}) using a configurable bucket from
+ * {@code BucketConfigurationSupplier}. If the rate limit is exceeded, the method invocation is
+ * blocked and an HTTP 429 (Too Many Requests) response is thrown.
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface RateLimited {
+
+}
\ No newline at end of file
diff --git a/src/main/java/io/ylab/chat/aop/annotation/WithUserContext.java b/src/main/java/io/ylab/chat/aop/annotation/WithUserContext.java
new file mode 100644
index 0000000..9f7be98
--- /dev/null
+++ b/src/main/java/io/ylab/chat/aop/annotation/WithUserContext.java
@@ -0,0 +1,22 @@
+package io.ylab.chat.aop.annotation;
+
+import io.ylab.chat.aop.ChatGuardAspect;
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Establishes user context before method execution and clears it afterwards. When this annotation
+ * is applied, the {@link ChatGuardAspect} extracts the currently authenticated user from
+ * {@code SecurityContextHolder}, validates that the user is authenticated (not anonymous), and sets
+ * the username into {@code UserContext}. After the method completes, the user context is cleared to
+ * avoid leakage across threads.
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface WithUserContext {
+
+}
\ No newline at end of file
diff --git a/src/main/java/io/ylab/chat/concurrency/MessageIdRegistry.java b/src/main/java/io/ylab/chat/concurrency/MessageIdRegistry.java
new file mode 100644
index 0000000..eaef1d3
--- /dev/null
+++ b/src/main/java/io/ylab/chat/concurrency/MessageIdRegistry.java
@@ -0,0 +1,73 @@
+package io.ylab.chat.concurrency;
+
+import org.springframework.stereotype.Component;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Component that tracks processed message IDs to prevent duplicate processing.
+ */
+@Component
+public class MessageIdRegistry {
+
+ /**
+ * Time-to-live for message IDs in milliseconds (5 minutes).
+ */
+ private static final long TTL_MS = 300_000;
+
+ /**
+ * Map storing processed message IDs and their processing timestamps.
+ */
+ private final Map processedMessages = new ConcurrentHashMap<>();
+ /**
+ * Scheduled executor service for periodic cleanup of expired message IDs.
+ */
+ private final ScheduledExecutorService cleanupExecutor = Executors.newSingleThreadScheduledExecutor();
+
+ /**
+ * Constructs a new {@code MessageIdRegistry} and starts the periodic cleanup task.
+ */
+ public MessageIdRegistry() {
+ cleanupExecutor.scheduleAtFixedRate(this::cleanup, 1, 1, TimeUnit.MINUTES);
+ }
+
+ /**
+ * Checks if a message ID has already been processed.
+ *
+ * @param messageId the message ID to check
+ * @return {@code true} if the message ID has been processed, {@code false} otherwise
+ */
+ public boolean isProcessed(String messageId) {
+ return processedMessages.containsKey(messageId);
+ }
+
+ /**
+ * Marks a message ID as processed by recording the current timestamp.
+ *
+ * @param messageId the message ID to mark as processed
+ * @return {@code true} if the message ID was newly recorded, {@code false} if it was already
+ * present
+ */
+ public boolean markAsProcessed(String messageId) {
+ Long existing = processedMessages.putIfAbsent(messageId, System.currentTimeMillis());
+ return existing == null;
+ }
+
+ /**
+ * Removes message IDs that have exceeded their time-to-live (TTL).
+ */
+ private void cleanup() {
+ long now = System.currentTimeMillis();
+ processedMessages.entrySet().removeIf(entry -> now - entry.getValue() > TTL_MS);
+ }
+
+ /**
+ * Shuts down the scheduled cleanup executor.
+ */
+ public void shutdown() {
+ cleanupExecutor.shutdown();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/ylab/chat/concurrency/OnlineUserRegistry.java b/src/main/java/io/ylab/chat/concurrency/OnlineUserRegistry.java
new file mode 100644
index 0000000..3454bdb
--- /dev/null
+++ b/src/main/java/io/ylab/chat/concurrency/OnlineUserRegistry.java
@@ -0,0 +1,96 @@
+package io.ylab.chat.concurrency;
+
+import lombok.*;
+import org.springframework.stereotype.Component;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Component for managing the registry of online users in a thread-safe manner.
+ */
+@Component
+public class OnlineUserRegistry {
+
+ /**
+ * A thread-safe map to store online users and their session information. The key is the
+ * username, and the value is a {@link SessionInfo} object.
+ */
+ private final Map onlineUsers = new ConcurrentHashMap<>();
+
+ /**
+ * Registers a user as online by storing their session information.
+ *
+ * @param username the username of the user
+ * @param sessionId the session ID associated with the user
+ */
+ public void register(String username, String sessionId) {
+ SessionInfo info = SessionInfo.builder()
+ .username(username)
+ .sessionId(sessionId)
+ .connectedAt(System.currentTimeMillis())
+ .build();
+ onlineUsers.put(username, info);
+ }
+
+ /**
+ * Unregisters a user, removing them from the online users registry.
+ *
+ * @param username the username of the user to unregister
+ */
+ public void unregister(String username) {
+ onlineUsers.remove(username);
+ }
+
+ /**
+ * Checks if a user is currently online.
+ *
+ * @param username the username to check
+ * @return {@code true} if the user is online, {@code false} otherwise
+ */
+ public boolean isOnline(String username) {
+ return onlineUsers.containsKey(username);
+ }
+
+ /**
+ * Retrieves the set of usernames of all online users.
+ *
+ * @return a {@link Set} containing the usernames of online users
+ */
+ public Set getOnlineUsers() {
+ return new HashSet<>(onlineUsers.keySet());
+ }
+
+ /**
+ * Retrieves the total count of online users.
+ *
+ * @return the number of online users
+ */
+ public int getOnlineCount() {
+ return onlineUsers.size();
+ }
+
+ /**
+ * Inner class representing session information for an online user.
+ */
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class SessionInfo {
+
+ /**
+ * The username of the user.
+ */
+ private String username;
+
+ /**
+ * The session ID associated with the user.
+ */
+ private String sessionId;
+
+ /**
+ * The timestamp (in milliseconds) when the user connected.
+ */
+ private long connectedAt;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/ylab/chat/config/AsyncConfig.java b/src/main/java/io/ylab/chat/config/AsyncConfig.java
new file mode 100644
index 0000000..c8e1db3
--- /dev/null
+++ b/src/main/java/io/ylab/chat/config/AsyncConfig.java
@@ -0,0 +1,40 @@
+package io.ylab.chat.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Configuration class for setting up asynchronous task execution.
+ */
+@Configuration
+@Slf4j
+public class AsyncConfig {
+
+ /**
+ * Creates and configures a {@link ThreadPoolTaskExecutor} for chat-related asynchronous tasks.
+ *
+ * @return the configured {@link ThreadPoolTaskExecutor} instance
+ */
+ @Bean(name = "chatExecutor")
+ public ThreadPoolTaskExecutor chatExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(8);
+ executor.setMaxPoolSize(16);
+ executor.setQueueCapacity(100);
+ executor.setThreadNamePrefix("chat-msg-");
+ executor.setRejectedExecutionHandler(
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ executor.setThreadFactory(r -> {
+ Thread t = new Thread(r);
+ t.setUncaughtExceptionHandler((thread, e) ->
+ log.error("Uncaught exception in thread {}: {}", thread.getName(), e.getMessage(),
+ e));
+ return t;
+ });
+ executor.initialize();
+ return executor;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/ylab/chat/config/WebSocketConfig.java b/src/main/java/io/ylab/chat/config/WebSocketConfig.java
new file mode 100644
index 0000000..0b3e422
--- /dev/null
+++ b/src/main/java/io/ylab/chat/config/WebSocketConfig.java
@@ -0,0 +1,144 @@
+package io.ylab.chat.config;
+
+import io.ylab.chat.util.JwtUtil;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.simp.config.ChannelRegistration;
+import org.springframework.messaging.simp.config.MessageBrokerRegistry;
+import org.springframework.messaging.simp.stomp.StompCommand;
+import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
+import org.springframework.messaging.support.ChannelInterceptor;
+import org.springframework.messaging.support.MessageHeaderAccessor;
+import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
+import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
+import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
+import java.security.Principal;
+import java.util.ArrayList;
+
+/**
+ * Configuration class for setting up WebSocket messaging with Spring.
+ */
+@Slf4j
+@Configuration
+@EnableWebSocketMessageBroker
+@RequiredArgsConstructor
+public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
+
+ private final JwtUtil jwtUtil;
+
+ /**
+ * Configures the message broker for WebSocket communication.
+ *
+ * @param config the {@link MessageBrokerRegistry} to configure
+ */
+ @Override
+ public void configureMessageBroker(MessageBrokerRegistry config) {
+ config.enableSimpleBroker("/topic", "/queue");
+ config.setApplicationDestinationPrefixes("/app");
+ config.setUserDestinationPrefix("/user");
+ }
+
+ /**
+ * Registers STOMP endpoints for WebSocket communication.
+ *
+ * @param registry the {@link StompEndpointRegistry} to configure
+ */
+ @Override
+ public void registerStompEndpoints(StompEndpointRegistry registry) {
+ registry.addEndpoint("/ws")
+ .setAllowedOriginPatterns("*")
+ .withSockJS();
+ }
+
+ private Principal authenticateFromToken(String authHeader) {
+ if (authHeader == null || !authHeader.startsWith("Bearer ")) {
+ return null;
+ }
+ String token = authHeader.substring(7);
+ try {
+ String username = jwtUtil.extractUsername(token);
+ if (jwtUtil.validateToken(token, username)) {
+ return () -> username;
+ }
+ } catch (Exception e) {
+ log.debug("JWT validation failed: {}", e.getMessage());
+ }
+ return null;
+ }
+
+ /**
+ * Configures the client inbound channel to intercept and process WebSocket messages.
+ *
+ * @param registration the {@link ChannelRegistration} to configure
+ */
+ @Override
+ public void configureClientInboundChannel(ChannelRegistration registration) {
+ registration.interceptors(new ChannelInterceptor() {
+ @Override
+ public Message> preSend(Message> message, MessageChannel channel) {
+ StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message,
+ StompHeaderAccessor.class);
+
+ log.debug("WebSocket Interceptor: Command = {}, User = {}",
+ accessor.getCommand(), accessor.getUser());
+
+ if (StompCommand.CONNECT.equals(accessor.getCommand())) {
+ String authToken = accessor.getFirstNativeHeader("Authorization");
+ Principal principal = authenticateFromToken(authToken);
+
+ if (principal != null) {
+ accessor.setUser(principal);
+ UsernamePasswordAuthenticationToken auth =
+ new UsernamePasswordAuthenticationToken(principal.getName(), null,
+ new ArrayList<>());
+ SecurityContextHolder.getContext().setAuthentication(auth);
+ } else {
+ throw new RuntimeException("Invalid JWT token");
+ }
+ } else if (StompCommand.SUBSCRIBE.equals(accessor.getCommand()) ||
+ StompCommand.SEND.equals(accessor.getCommand())) {
+
+ if (accessor.getUser() == null) {
+ Authentication auth = SecurityContextHolder.getContext()
+ .getAuthentication();
+ if (auth != null && auth.isAuthenticated()
+ && !(auth.getPrincipal() instanceof String)) {
+ Principal principal = () -> auth.getName();
+ accessor.setUser(principal);
+ }
+ }
+
+ if (accessor.getUser() == null) {
+ String authToken = accessor.getFirstNativeHeader("Authorization");
+ if (authToken != null && authToken.startsWith("Bearer ")) {
+ String token = authToken.substring(7);
+ try {
+ String username = jwtUtil.extractUsername(token);
+ if (jwtUtil.validateToken(token, username)) {
+ Principal principal = () -> username;
+ accessor.setUser(principal);
+ UsernamePasswordAuthenticationToken authTokenObj =
+ new UsernamePasswordAuthenticationToken(principal, null,
+ new ArrayList<>());
+ SecurityContextHolder.getContext()
+ .setAuthentication(authTokenObj);
+ }
+ } catch (Exception e) {
+ log.error("WebSocket {}: Invalid JWT token", accessor.getCommand(),
+ e);
+ }
+ }
+ }
+ }
+
+ return message;
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/ylab/chat/config/ratelimit/RateLimiterConfig.java b/src/main/java/io/ylab/chat/config/ratelimit/RateLimiterConfig.java
new file mode 100644
index 0000000..d9f5bdf
--- /dev/null
+++ b/src/main/java/io/ylab/chat/config/ratelimit/RateLimiterConfig.java
@@ -0,0 +1,43 @@
+package io.ylab.chat.config.ratelimit;
+
+import io.github.bucket4j.Bandwidth;
+import io.github.bucket4j.BucketConfiguration;
+import io.github.bucket4j.distributed.proxy.ProxyManager;
+import io.github.bucket4j.redis.lettuce.cas.LettuceBasedProxyManager;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.codec.ByteArrayCodec;
+import java.time.Duration;
+import java.util.function.Supplier;
+import lombok.RequiredArgsConstructor;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@RequiredArgsConstructor
+@EnableConfigurationProperties(RateLimiterProperties.class)
+public class RateLimiterConfig {
+
+ private final RedisClient redisClient;
+ private final RateLimiterProperties properties;
+
+ @Bean
+ public ProxyManager bucketProxyManager() {
+ StatefulRedisConnection connection = redisClient.connect(
+ ByteArrayCodec.INSTANCE);
+ return LettuceBasedProxyManager.builderFor(connection)
+ .build();
+ }
+
+ @Bean
+ public Supplier messageBucketConfiguration() {
+ return () -> BucketConfiguration.builder()
+ .addLimit(Bandwidth.builder()
+ .capacity(properties.getMessages().getCapacity())
+ .refillGreedy(properties.getMessages().getRefillTokens(),
+ Duration.ofSeconds(properties.getMessages().getRefillDurationSeconds()))
+ .build())
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/ylab/chat/config/ratelimit/RateLimiterProperties.java b/src/main/java/io/ylab/chat/config/ratelimit/RateLimiterProperties.java
new file mode 100644
index 0000000..81245a8
--- /dev/null
+++ b/src/main/java/io/ylab/chat/config/ratelimit/RateLimiterProperties.java
@@ -0,0 +1,21 @@
+package io.ylab.chat.config.ratelimit;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@Getter
+@Setter
+@ConfigurationProperties(prefix = "rate-limiter")
+public class RateLimiterProperties {
+
+ private Messages messages = new Messages();
+
+ @Getter
+ @Setter
+ public static class Messages {
+ private long capacity;
+ private long refillTokens;
+ private long refillDurationSeconds;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/ylab/chat/config/redis/RedisConfig.java b/src/main/java/io/ylab/chat/config/redis/RedisConfig.java
new file mode 100644
index 0000000..7d0bed1
--- /dev/null
+++ b/src/main/java/io/ylab/chat/config/redis/RedisConfig.java
@@ -0,0 +1,105 @@
+package io.ylab.chat.config.redis;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisURI;
+import io.ylab.chat.entity.MessageEntity;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+/**
+ * Configuration class for Redis connections and serialization. This class sets up a reactive
+ * Lettuce-based Redis client and two {@link RedisTemplate} beans for generic object storage and
+ * typed message entity storage. Connection parameters (host, port, password) are read from Spring
+ * environment properties with sensible defaults.
+ *
+ * @see RedisClient
+ * @see RedisTemplate
+ */
+@Slf4j
+@Configuration
+public class RedisConfig {
+
+ @Value("${spring.data.redis.host:localhost}")
+ private String redisHost;
+
+ @Value("${spring.data.redis.port:6379}")
+ private int redisPort;
+
+ @Value("${spring.data.redis.password:}")
+ private String redisPassword;
+
+ /**
+ * Creates a Lettuce-based Redis client with the configured host, port, and optional password.
+ * The client is responsible for managing low-level connections to the Redis server. The
+ * {@code destroyMethod = "shutdown"} ensures proper resource cleanup when the application
+ * context is closed.
+ *
+ * @return a configured {@link RedisClient} instance
+ */
+ @Bean(destroyMethod = "shutdown")
+ public RedisClient lettuceRedisClient() {
+ RedisURI.Builder builder = RedisURI.builder()
+ .withHost(redisHost)
+ .withPort(redisPort);
+ if (!redisPassword.isBlank()) {
+ builder.withPassword(redisPassword.toCharArray());
+ }
+ return RedisClient.create(builder.build());
+ }
+
+ /**
+ * Creates a generic {@link RedisTemplate} for storing and retrieving arbitrary Java objects.
+ * Keys and hash keys are serialized as UTF-8 strings using {@link StringRedisSerializer}.
+ * Values and hash values are serialized as JSON using {@link Jackson2JsonRedisSerializer} with
+ * {@link Object} as the target type, allowing storage of any serializable object. This template
+ * is suitable for general-purpose caching or data storage where the exact value type may vary.
+ *
+ * @param factory the Redis connection factory (auto-configured by Spring Boot)
+ * @return a configured {@link RedisTemplate} for {@code String, Object} pairs
+ */
+ @Bean
+ public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
+ RedisTemplate template = new RedisTemplate<>();
+ template.setConnectionFactory(factory);
+ template.setKeySerializer(new StringRedisSerializer());
+ template.setHashKeySerializer(new StringRedisSerializer());
+
+ Jackson2JsonRedisSerializer