SpringBoot使用SSE


SpringBoot使用SSE

1.什么是SSE?

大家都知道HTTP协议是单向请求的,只能客户端向服务端发送请求,服务端收到请求后响应客户端;然而项目中有时需要服务端主动向客户端发送消息,通常我们都使用WebSocket来实现这一功能,WebSocket是双向通信的即客户端和服务端可以相互发送数据;有时我们仅仅需要服务端能够持续发送数据给客户端,这时我们可以使用SSE;

SSE 的工作原理

SSE (Server-Sent Events) 是一种使服务器能实时、单向地发送信息到客户端的技术。与 WebSocket 不同的是,SSE 是基于 HTTP 协议实现的,并且只支持服务器到客户端的单向通信。

工作原理很简单:客户端通过普通的 HTTP 请求连接到服务器,并通过特定的 HTTP 头信息告知服务器它希望保持连接以便接收实时数据。服务器接收到这个请求后,保持连接不断开,并周期性地向客户端发送消息。

2.SseEmitter实现SSE

spring-boot-starter-web自带SseEmitter,不需要额外依赖。SseEmitter是实时发送数据,主要将它用在服务器向客户端推送实时数据,如实时消息推送、状态更新等场景。

2.1 SseClient

/**
 * @description: sse客户端
 * @Title: SseClient
 * @Author xlw
 * @Package com.xlw.spring_design_demo.sse
 * @Date 2025/1/18 16:33
 */
public class SseClient {

    /**
     * SSE 池
     */
    public final static ConcurrentHashMap<String, SseEmitter> SSE_POOL = new ConcurrentHashMap<>();

    public static SseEmitter createSse(String key) {
        if (SSE_POOL.containsKey(key)) {
            return null;
        }
        //默认30秒超时,设置为0L则永不超时
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitter.onCompletion(() -> {
            System.out.println("完成数据发送");
            SSE_POOL.remove(key);
        });

        sseEmitter.onTimeout(() -> {
            System.out.println("超时");
            SSE_POOL.remove(key);
        });

        sseEmitter.onError(e -> {
            System.out.println("发生错误");
            SSE_POOL.remove(key);
        });
        try {
            sseEmitter.send(SseEmitter.event().reconnectTime(5000));
        } catch (IOException e) {
            e.printStackTrace();
        }
        SSE_POOL.put(key, sseEmitter);
        return sseEmitter;
    }

    public static void sendMessage(String key, String messageId, String message) {
        if (!SSE_POOL.containsKey(key)) {
            throw new RuntimeException(key + "不存在");
        }
        try {
            SSE_POOL.get(key).send(SseEmitter.event().id(messageId).reconnectTime(1*60*1000L).data(message));
        } catch (IOException e) {
            SSE_POOL.remove(key);
            e.printStackTrace();
        }
    }

    public static void closeSse(String key) {
        if (SSE_POOL.containsKey(key)) {
            SseEmitter sseEmitter = SSE_POOL.get(key);
            //数据发送完毕
            sseEmitter.complete();
            SSE_POOL.remove(key);
        }
    }
}

2.2 测试Controller

/**
 * @module:
 * @Title: SseController
 * @Author xlw
 * @Package com.xlw.spring_design_demo.sse
 * @Date 2025/1/18 16:47
 */
@Controller
@RequestMapping("sse")
public class SseController {

    @GetMapping(path = "createSse", produces = "text/event-stream")
    public SseEmitter createSse(String key, HttpServletResponse respons) {
        respons.setContentType("text/event-stream");
        return SseClient.createSse(key);
    }

    @ResponseBody
    @GetMapping("sendMessage")
    public Boolean sendMessage(String key, String messageId, String message) {
        SseClient.sendMessage(key, messageId, message);
        return true;
    }

    @ResponseBody
    @GetMapping("closeSse")
    public Boolean closeSse(String key) {
        SseClient.closeSse(key);
        return true;
    }
}

2.3 效果

2.4 说明

SSE 如何建立连接

  1. 客户端启动一个 SSE 连接: 客户端使用 EventSource 接口创建一个到服务器的连接。
  2. HTTP 请求: 客户端向服务器发送一个 GET 请求,并在请求头中加入 Accept: text/event-stream 表明这是一个 SSE 连接请求。
  3. 服务器响应: 服务器处理这个请求后,保持该连接打开,并设置响应头 Content-Type: text/event-stream,告诉客户端后续的内容将是事件流。
  4. 发送消息: 服务器周期性地发送格式化的消息数据给客户端,每条消息都是纯文本,以 “data: “ 开头,后面是消息内容,并以连续的两个换行符 \n\n 结束。

SSE 数据流的格式与传输方式

SSE 中,数据是以纯文本格式通过持续的 HTTP 响应传输的。有几个字段可以用来构成一个 SSE 消息:

  • event: 可以定义事件的类型,默认为 “message”。
  • data: 消息的主体数据。
  • id: 事件的唯一标识符。
  • retry: 重连时间间隔,告知客户端在断开连接时应等待多少毫秒后重试连接。
    一个典型的 SSE 数据包如下所示:
data: This is a message\n\n

或者带有事件类型的:

event: userupdate
data: {"username": "john_doe", "status": "online"}
\n\n

SSE 的典型使用场景

应用场景描述
实时通知比如社交媒体平台中,当有新消息或动态时,服务器会立即通知客户端,允许用户及时看到更新。
实时数据更新股票行情:股票市场的价格经常变动,SSE 可以让用户实时看到新的股票价格而不需要手动刷新页面。 体育比分:体育赛事的比分、统计信息等,可以通过 SSE 在比赛进行时实时更新。
地理位置追踪在地图应用中,SSE 可以用于监控特定物体的位置变化,并持续将更新推送给客户端。
系统监控服务器可以将系统的实时运行状态通过 SSE 发送给管理界面,便于管理员及时了解系统状况。

总的来说,SSE 是一种非常适合进行单方向数据流更新的技术,特别是在不需要客户端到服务器的通信时,它比 WebSocket 更为简单和高效。它不适合那些需要全双工通信的场景,例如需要频繁双向交互的在线游戏或聊天应用。

3.ResponseBodyEmitter实现SSE

ResponseBodyEmitterSseEmitter的父类适用于要动态生成内容并逐步发送给客户端的场景,例如:文件上传进度、实时日志等,可以在任务执行过程中逐步向客户端发送更新。

3.1 代码实现

@GetMapping(path = "createEmitter")
public ResponseEntity<ResponseBodyEmitter> createEmitter() {
    // 创建一个ResponseBodyEmitter,-1代表不超时
    ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    emitter.onCompletion(() -> {
        System.out.println("完成数据发送");
    });
    emitter.onTimeout(() -> {
        System.out.println("超时");
    });
    emitter.onError(e -> {
        System.out.println("发生异常");
    });
    new Thread(() -> {
        try {
            for (int i = 0; i < 100; i++) {
                emitter.send("Progress: " + i + "%\n", MediaType.TEXT_HTML);
                Thread.sleep(100);
                System.out.println("发送:" + i);
            }
            emitter.complete();
        } catch (Exception e) {
            e.printStackTrace();
            emitter.completeWithError(e);
        }
    }).start();
    return ResponseEntity.ok().contentType(MediaType.TEXT_HTML).body(emitter);
}

3.2 效果

Postman中调用是需要等待所有数据完成后才能返回结果:

4.StreamingResponseBody实现SSE

StreamingResponseBody 与其他响应处理方式略有不同,主要用于处理大数据量或持续数据流的传输,支持将数据直接写入OutputStream

例如,当我们需要下载一个超大文件时,使用 StreamingResponseBody 可以避免将文件数据一次性加载到内存中,而是持续不断的把文件流发送给客户端,从而解决下载大文件时常见的内存溢出问题。

接口实现直接返回 StreamingResponseBody 对象,将数据写入输出流并刷新,调用一次flush就会向客户端写入一次数据。

4.1 代码

@GetMapping(path = "createStreamingResponseBody")
public ResponseEntity<StreamingResponseBody> createStreamingResponseBody() {
    StreamingResponseBody responseBody = new StreamingResponseBody() {
        //异步执行
        @Override
        public void writeTo(OutputStream outputStream) throws IOException {
            for (int i = 0; i < 100; i++) {
                outputStream.write(("Progress: " + i + "%\r\n").getBytes());
                try {
                    Thread.sleep(100);
                    outputStream.flush();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("发送:" + i);
            }
            outputStream.close();
        }
    };
    return ResponseEntity.ok().contentType(MediaType.TEXT_HTML).body(responseBody);
}

4.2 效果


文章作者: 威@猫
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 威@猫 !
评论
  目录