From a8a1db4463667dadcb3a302bfcc52d20e35b9f98 Mon Sep 17 00:00:00 2001 From: AprilWind <2100166581@qq.com> Date: Tue, 24 Jun 2025 14:37:02 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E4=BC=98=E5=8C=96SSE=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/sse/core/SseEmitterManager.java | 65 +++++++++---------- 1 file changed, 29 insertions(+), 36 deletions(-) diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java index bc19460f8..936851b3b 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -1,6 +1,7 @@ package org.dromara.common.sse.core; import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ObjectUtil; import lombok.extern.slf4j.Slf4j; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.sse.dto.SseMessageDto; @@ -24,6 +25,9 @@ public class SseEmitterManager { */ private final static String SSE_TOPIC = "global:sse"; + /** + * 本地内存缓存,存活用户连接 + */ private final static Map> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); /** @@ -43,32 +47,18 @@ public class SseEmitterManager { emitters.put(token, emitter); - // 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token - emitter.onCompletion(() -> { - SseEmitter remove = emitters.remove(token); - if (remove != null) { - remove.complete(); - } - }); - emitter.onTimeout(() -> { - SseEmitter remove = emitters.remove(token); - if (remove != null) { - remove.complete(); - } - }); - emitter.onError((e) -> { - SseEmitter remove = emitters.remove(token); - if (remove != null) { - remove.complete(); - } - }); + // 注册连接结束、超时、错误等统一清理逻辑 + Runnable callback = () -> disconnect(userId, token); + emitter.onCompletion(callback); + emitter.onTimeout(callback); + emitter.onError(e -> callback.run()); try { - // 向客户端发送一条连接成功的事件 emitter.send(SseEmitter.event().comment("connected")); + log.warn("SSE连接成功 userId={}, token={}", userId, token); } catch (IOException e) { - // 如果发送消息失败,则从映射表中移除 emitter - emitters.remove(token); + log.warn("SSE连接初始化失败 userId={}, token={}, 错误: {}", userId, token, e.getMessage()); + disconnect(userId, token); } return emitter; } @@ -85,13 +75,20 @@ public class SseEmitterManager { } Map emitters = USER_TOKEN_EMITTERS.get(userId); if (MapUtil.isNotEmpty(emitters)) { - try { - SseEmitter sseEmitter = emitters.get(token); - sseEmitter.send(SseEmitter.event().comment("disconnected")); - sseEmitter.complete(); - } catch (Exception ignore) { + SseEmitter sseEmitter = emitters.remove(token); + if (ObjectUtil.isNotNull(sseEmitter)) { + try { + sseEmitter.send(SseEmitter.event().comment("disconnected")); + } catch (IOException e) { + log.warn("SSE断开通知失败 userId={}, token={}", userId, token); + } finally { + sseEmitter.complete(); + log.warn("SSE连接断开 userId={}, token={}", userId, token); + } + } + if (emitters.isEmpty()) { + USER_TOKEN_EMITTERS.remove(userId); } - emitters.remove(token); } else { USER_TOKEN_EMITTERS.remove(userId); } @@ -115,16 +112,12 @@ public class SseEmitterManager { public void sendMessage(Long userId, String message) { Map emitters = USER_TOKEN_EMITTERS.get(userId); if (MapUtil.isNotEmpty(emitters)) { - for (Map.Entry entry : emitters.entrySet()) { + for (String token : emitters.keySet()) { try { - entry.getValue().send(SseEmitter.event() - .name("message") - .data(message)); + emitters.get(token).send(SseEmitter.event().name("message").data(message)); } catch (Exception e) { - SseEmitter remove = emitters.remove(entry.getKey()); - if (remove != null) { - remove.complete(); - } + log.warn("SSE消息发送失败 userId={}, token={}, error={}", userId, token, e.getMessage()); + disconnect(userId, token); } } } else {