diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java index 936851b3b..bc19460f8 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -1,7 +1,6 @@ package org.dromara.common.sse.core; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.ObjectUtil; import lombok.extern.slf4j.Slf4j; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.sse.dto.SseMessageDto; @@ -25,9 +24,6 @@ public class SseEmitterManager { */ private final static String SSE_TOPIC = "global:sse"; - /** - * 本地内存缓存,存活用户连接 - */ private final static Map> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); /** @@ -47,18 +43,32 @@ public class SseEmitterManager { emitters.put(token, emitter); - // 注册连接结束、超时、错误等统一清理逻辑 - Runnable callback = () -> disconnect(userId, token); - emitter.onCompletion(callback); - emitter.onTimeout(callback); - emitter.onError(e -> callback.run()); + // 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token + emitter.onCompletion(() -> { + SseEmitter remove = emitters.remove(token); + if (remove != null) { + remove.complete(); + } + }); + emitter.onTimeout(() -> { + SseEmitter remove = emitters.remove(token); + if (remove != null) { + remove.complete(); + } + }); + emitter.onError((e) -> { + SseEmitter remove = emitters.remove(token); + if (remove != null) { + remove.complete(); + } + }); try { + // 向客户端发送一条连接成功的事件 emitter.send(SseEmitter.event().comment("connected")); - log.warn("SSE连接成功 userId={}, token={}", userId, token); } catch (IOException e) { - log.warn("SSE连接初始化失败 userId={}, token={}, 错误: {}", userId, token, e.getMessage()); - disconnect(userId, token); + // 如果发送消息失败,则从映射表中移除 emitter + emitters.remove(token); } return emitter; } @@ -75,20 +85,13 @@ public class SseEmitterManager { } Map emitters = USER_TOKEN_EMITTERS.get(userId); if (MapUtil.isNotEmpty(emitters)) { - SseEmitter sseEmitter = emitters.remove(token); - if (ObjectUtil.isNotNull(sseEmitter)) { - try { - sseEmitter.send(SseEmitter.event().comment("disconnected")); - } catch (IOException e) { - log.warn("SSE断开通知失败 userId={}, token={}", userId, token); - } finally { - sseEmitter.complete(); - log.warn("SSE连接断开 userId={}, token={}", userId, token); - } - } - if (emitters.isEmpty()) { - USER_TOKEN_EMITTERS.remove(userId); + try { + SseEmitter sseEmitter = emitters.get(token); + sseEmitter.send(SseEmitter.event().comment("disconnected")); + sseEmitter.complete(); + } catch (Exception ignore) { } + emitters.remove(token); } else { USER_TOKEN_EMITTERS.remove(userId); } @@ -112,12 +115,16 @@ public class SseEmitterManager { public void sendMessage(Long userId, String message) { Map emitters = USER_TOKEN_EMITTERS.get(userId); if (MapUtil.isNotEmpty(emitters)) { - for (String token : emitters.keySet()) { + for (Map.Entry entry : emitters.entrySet()) { try { - emitters.get(token).send(SseEmitter.event().name("message").data(message)); + entry.getValue().send(SseEmitter.event() + .name("message") + .data(message)); } catch (Exception e) { - log.warn("SSE消息发送失败 userId={}, token={}, error={}", userId, token, e.getMessage()); - disconnect(userId, token); + SseEmitter remove = emitters.remove(entry.getKey()); + if (remove != null) { + remove.complete(); + } } } } else {