Revert "update 优化SSE连接"

This reverts commit a8a1db4463.
This commit is contained in:
疯狂的狮子Li 2025-06-24 06:51:28 +00:00 committed by Gitee
parent a8a1db4463
commit 1598447f6b
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F

View File

@ -1,7 +1,6 @@
package org.dromara.common.sse.core; package org.dromara.common.sse.core;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.sse.dto.SseMessageDto; import org.dromara.common.sse.dto.SseMessageDto;
@ -25,9 +24,6 @@ public class SseEmitterManager {
*/ */
private final static String SSE_TOPIC = "global:sse"; private final static String SSE_TOPIC = "global:sse";
/**
* 本地内存缓存存活用户连接
*/
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
/** /**
@ -47,18 +43,32 @@ public class SseEmitterManager {
emitters.put(token, emitter); emitters.put(token, emitter);
// 注册连接结束超时错误等统一清理逻辑 // emitter 完成超时或发生错误时从映射表中移除对应的 token
Runnable callback = () -> disconnect(userId, token); emitter.onCompletion(() -> {
emitter.onCompletion(callback); SseEmitter remove = emitters.remove(token);
emitter.onTimeout(callback); if (remove != null) {
emitter.onError(e -> callback.run()); remove.complete();
}
});
emitter.onTimeout(() -> {
SseEmitter remove = emitters.remove(token);
if (remove != null) {
remove.complete();
}
});
emitter.onError((e) -> {
SseEmitter remove = emitters.remove(token);
if (remove != null) {
remove.complete();
}
});
try { try {
// 向客户端发送一条连接成功的事件
emitter.send(SseEmitter.event().comment("connected")); emitter.send(SseEmitter.event().comment("connected"));
log.warn("SSE连接成功 userId={}, token={}", userId, token);
} catch (IOException e) { } catch (IOException e) {
log.warn("SSE连接初始化失败 userId={}, token={}, 错误: {}", userId, token, e.getMessage()); // 如果发送消息失败则从映射表中移除 emitter
disconnect(userId, token); emitters.remove(token);
} }
return emitter; return emitter;
} }
@ -75,20 +85,13 @@ public class SseEmitterManager {
} }
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
if (MapUtil.isNotEmpty(emitters)) { if (MapUtil.isNotEmpty(emitters)) {
SseEmitter sseEmitter = emitters.remove(token); try {
if (ObjectUtil.isNotNull(sseEmitter)) { SseEmitter sseEmitter = emitters.get(token);
try { sseEmitter.send(SseEmitter.event().comment("disconnected"));
sseEmitter.send(SseEmitter.event().comment("disconnected")); sseEmitter.complete();
} catch (IOException e) { } catch (Exception ignore) {
log.warn("SSE断开通知失败 userId={}, token={}", userId, token);
} finally {
sseEmitter.complete();
log.warn("SSE连接断开 userId={}, token={}", userId, token);
}
}
if (emitters.isEmpty()) {
USER_TOKEN_EMITTERS.remove(userId);
} }
emitters.remove(token);
} else { } else {
USER_TOKEN_EMITTERS.remove(userId); USER_TOKEN_EMITTERS.remove(userId);
} }
@ -112,12 +115,16 @@ public class SseEmitterManager {
public void sendMessage(Long userId, String message) { public void sendMessage(Long userId, String message) {
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
if (MapUtil.isNotEmpty(emitters)) { if (MapUtil.isNotEmpty(emitters)) {
for (String token : emitters.keySet()) { for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
try { try {
emitters.get(token).send(SseEmitter.event().name("message").data(message)); entry.getValue().send(SseEmitter.event()
.name("message")
.data(message));
} catch (Exception e) { } catch (Exception e) {
log.warn("SSE消息发送失败 userId={}, token={}, error={}", userId, token, e.getMessage()); SseEmitter remove = emitters.remove(entry.getKey());
disconnect(userId, token); if (remove != null) {
remove.complete();
}
} }
} }
} else { } else {