SpringBoot使用SSE
1.什么是SSE?
大家都知道HTTP协议是单向请求的,只能客户端向服务端发送请求,服务端收到请求后响应客户端;然而项目中有时需要服务端主动向客户端发送消息,通常我们都使用WebSocket来实现这一功能,WebSocket是双向通信的即客户端和服务端可以相互发送数据;有时我们仅仅需要服务端能够持续发送数据给客户端,这时我们可以使用SSE;
SSE 的工作原理
SSE (Server-Sent Events) 是一种使服务器能实时、单向地发送信息到客户端的技术。与 WebSocket 不同的是,SSE 是基于 HTTP 协议实现的,并且只支持服务器到客户端的单向通信。
工作原理很简单:客户端通过普通的 HTTP 请求连接到服务器,并通过特定的 HTTP 头信息告知服务器它希望保持连接以便接收实时数据。服务器接收到这个请求后,保持连接不断开,并周期性地向客户端发送消息。
2.SseEmitter实现SSE
spring-boot-starter-web自带SseEmitter,不需要额外依赖。SseEmitter是实时发送数据,主要将它用在服务器向客户端推送实时数据,如实时消息推送、状态更新等场景。
2.1 SseClient
/**
* @description: sse客户端
* @Title: SseClient
* @Author xlw
* @Package com.xlw.spring_design_demo.sse
* @Date 2025/1/18 16:33
*/
public class SseClient {
/**
* SSE 池
*/
public final static ConcurrentHashMap<String, SseEmitter> SSE_POOL = new ConcurrentHashMap<>();
public static SseEmitter createSse(String key) {
if (SSE_POOL.containsKey(key)) {
return null;
}
//默认30秒超时,设置为0L则永不超时
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onCompletion(() -> {
System.out.println("完成数据发送");
SSE_POOL.remove(key);
});
sseEmitter.onTimeout(() -> {
System.out.println("超时");
SSE_POOL.remove(key);
});
sseEmitter.onError(e -> {
System.out.println("发生错误");
SSE_POOL.remove(key);
});
try {
sseEmitter.send(SseEmitter.event().reconnectTime(5000));
} catch (IOException e) {
e.printStackTrace();
}
SSE_POOL.put(key, sseEmitter);
return sseEmitter;
}
public static void sendMessage(String key, String messageId, String message) {
if (!SSE_POOL.containsKey(key)) {
throw new RuntimeException(key + "不存在");
}
try {
SSE_POOL.get(key).send(SseEmitter.event().id(messageId).reconnectTime(1*60*1000L).data(message));
} catch (IOException e) {
SSE_POOL.remove(key);
e.printStackTrace();
}
}
public static void closeSse(String key) {
if (SSE_POOL.containsKey(key)) {
SseEmitter sseEmitter = SSE_POOL.get(key);
//数据发送完毕
sseEmitter.complete();
SSE_POOL.remove(key);
}
}
}2.2 测试Controller
/**
* @module:
* @Title: SseController
* @Author xlw
* @Package com.xlw.spring_design_demo.sse
* @Date 2025/1/18 16:47
*/
@Controller
@RequestMapping("sse")
public class SseController {
@GetMapping(path = "createSse", produces = "text/event-stream")
public SseEmitter createSse(String key, HttpServletResponse respons) {
respons.setContentType("text/event-stream");
return SseClient.createSse(key);
}
@ResponseBody
@GetMapping("sendMessage")
public Boolean sendMessage(String key, String messageId, String message) {
SseClient.sendMessage(key, messageId, message);
return true;
}
@ResponseBody
@GetMapping("closeSse")
public Boolean closeSse(String key) {
SseClient.closeSse(key);
return true;
}
}2.3 效果

2.4 说明
SSE 如何建立连接
- 客户端启动一个 SSE 连接: 客户端使用
EventSource接口创建一个到服务器的连接。- HTTP 请求: 客户端向服务器发送一个 GET 请求,并在请求头中加入
Accept: text/event-stream表明这是一个 SSE 连接请求。- 服务器响应: 服务器处理这个请求后,保持该连接打开,并设置响应头
Content-Type: text/event-stream,告诉客户端后续的内容将是事件流。- 发送消息: 服务器周期性地发送格式化的消息数据给客户端,每条消息都是纯文本,以 “data: “ 开头,后面是消息内容,并以连续的两个换行符
\n\n结束。
SSE 数据流的格式与传输方式
SSE 中,数据是以纯文本格式通过持续的 HTTP 响应传输的。有几个字段可以用来构成一个 SSE 消息:
- event: 可以定义事件的类型,默认为 “message”。
- data: 消息的主体数据。
- id: 事件的唯一标识符。
- retry: 重连时间间隔,告知客户端在断开连接时应等待多少毫秒后重试连接。
一个典型的 SSE 数据包如下所示:
data: This is a message\n\n或者带有事件类型的:
event: userupdate
data: {"username": "john_doe", "status": "online"}
\n\nSSE 的典型使用场景
| 应用场景 | 描述 |
|---|---|
| 实时通知 | 比如社交媒体平台中,当有新消息或动态时,服务器会立即通知客户端,允许用户及时看到更新。 |
| 实时数据更新 | 股票行情:股票市场的价格经常变动,SSE 可以让用户实时看到新的股票价格而不需要手动刷新页面。 体育比分:体育赛事的比分、统计信息等,可以通过 SSE 在比赛进行时实时更新。 |
| 地理位置追踪 | 在地图应用中,SSE 可以用于监控特定物体的位置变化,并持续将更新推送给客户端。 |
| 系统监控 | 服务器可以将系统的实时运行状态通过 SSE 发送给管理界面,便于管理员及时了解系统状况。 |
总的来说,SSE 是一种非常适合进行单方向数据流更新的技术,特别是在不需要客户端到服务器的通信时,它比 WebSocket 更为简单和高效。它不适合那些需要全双工通信的场景,例如需要频繁双向交互的在线游戏或聊天应用。
3.ResponseBodyEmitter实现SSE
ResponseBodyEmitter是SseEmitter的父类适用于要动态生成内容并逐步发送给客户端的场景,例如:文件上传进度、实时日志等,可以在任务执行过程中逐步向客户端发送更新。
3.1 代码实现
@GetMapping(path = "createEmitter")
public ResponseEntity<ResponseBodyEmitter> createEmitter() {
// 创建一个ResponseBodyEmitter,-1代表不超时
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
emitter.onCompletion(() -> {
System.out.println("完成数据发送");
});
emitter.onTimeout(() -> {
System.out.println("超时");
});
emitter.onError(e -> {
System.out.println("发生异常");
});
new Thread(() -> {
try {
for (int i = 0; i < 100; i++) {
emitter.send("Progress: " + i + "%\n", MediaType.TEXT_HTML);
Thread.sleep(100);
System.out.println("发送:" + i);
}
emitter.complete();
} catch (Exception e) {
e.printStackTrace();
emitter.completeWithError(e);
}
}).start();
return ResponseEntity.ok().contentType(MediaType.TEXT_HTML).body(emitter);
}3.2 效果

在Postman中调用是需要等待所有数据完成后才能返回结果:

4.StreamingResponseBody实现SSE
StreamingResponseBody 与其他响应处理方式略有不同,主要用于处理大数据量或持续数据流的传输,支持将数据直接写入OutputStream。
例如,当我们需要下载一个超大文件时,使用 StreamingResponseBody 可以避免将文件数据一次性加载到内存中,而是持续不断的把文件流发送给客户端,从而解决下载大文件时常见的内存溢出问题。
接口实现直接返回 StreamingResponseBody 对象,将数据写入输出流并刷新,调用一次flush就会向客户端写入一次数据。
4.1 代码
@GetMapping(path = "createStreamingResponseBody")
public ResponseEntity<StreamingResponseBody> createStreamingResponseBody() {
StreamingResponseBody responseBody = new StreamingResponseBody() {
//异步执行
@Override
public void writeTo(OutputStream outputStream) throws IOException {
for (int i = 0; i < 100; i++) {
outputStream.write(("Progress: " + i + "%\r\n").getBytes());
try {
Thread.sleep(100);
outputStream.flush();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("发送:" + i);
}
outputStream.close();
}
};
return ResponseEntity.ok().contentType(MediaType.TEXT_HTML).body(responseBody);
}4.2 效果
