Spring WebSocket实现实时通信的详细教程

简介

WebSocket 是基于TCP/IP协议,独立于HTTP协议的通信协议。WebSocket 连接允许客户端和服务器之间的全双工通信,以便任何一方都可以通过已建立的连接将数据推送到另一方。

我们常用的HTTP是客户端通过「请求-响应」的方式与服务器建立通信的,必须是客户端主动触发的行为,服务端只是做好接口被动等待请求。而在某些场景下的动作,是需要服务端主动触发的,比如向客户端发送消息、实时通讯、远程控制等。客户端是不知道这些动作几时触发的,假如用HTTP的方式,那么设备端需要不断轮询服务端,这样的方式对服务器压力太大,同时产生很多无效请求,且具有延迟性。于是才采用可以建立双向通讯的长连接协议。通过握手建立连接后,服务端可以实时发送数据与指令到设备端,服务器压力小。

Spring WebSocket是Spring框架的一部分,提供了在Web应用程序中实现实时双向通信的能力。本教程将引导你通过一个简单的例子,演示如何使用Spring WebSocket建立一个实时通信应用。

准备工作

确保你的项目中已经引入了Spring框架的WebSocket模块。你可以通过Maven添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

创建WebSocket配置类(实现WebSocketConfigurer接口)

首先,创建一个配置类,用于配置WebSocket的相关设置。

package com.ci.erp.human.config;

import com.ci.erp.human.handler.WebSocketHandler;
import com.ci.erp.human.interceptor.WebSocketHandleInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

/**
 *
 * Websocket配置类
 *
 * @author lucky_fd
 * @since 2024-01-17
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 注册websocket处理器和拦截器
        registry.addHandler(webSocketHandler(), "/websocket/server")
                .addInterceptors(webSocketHandleInterceptor()).setAllowedOrigins("*");
        registry.addHandler(webSocketHandler(), "/sockjs/server").setAllowedOrigins("*")
                .addInterceptors(webSocketHandleInterceptor()).withSockJS();
    }

    @Bean
    public WebSocketHandler webSocketHandler() {
        return new WebSocketHandler();
    }

    @Bean
    public WebSocketHandleInterceptor webSocketHandleInterceptor() {
        return new WebSocketHandleInterceptor();
    }
}

上面的配置类使用@EnableWebSocket注解启用WebSocket,并通过registerWebSocketHandlers方法注册WebSocket处理器。

  • registerWebSocketHandlers:这个方法是向spring容器注册一个handler处理器及对应映射地址,可以理解成MVC的Handler(控制器方法),websocket客户端通过请求的url查找处理器进行处理

  • addInterceptors:拦截器,当建立websocket连接的时候,我们可以通过继承spring的HttpSessionHandshakeInterceptor来做一些事情。

  • setAllowedOrigins:跨域设置,*表示所有域名都可以,不限制, 域包括ip:port, 指定*可以是任意的域名,不加的话默认localhost+本服务端口

  • withSockJS: 这个是应对浏览器不支持websocket协议的时候降级为轮询的处理。

创建WebSocket消息处理器(实现TextWebSocketHandler 接口)

接下来,创建一个消息处理器,处理客户端发送的消息。

package com.ci.erp.human.handler;

import cn.hutool.core.util.ObjectUtil;
import com.ci.erp.common.core.utils.JsonUtils;
import com.ci.erp.human.domain.thirdVo.YYHeartbeat;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 *
 * websocket处理类
 * 实现WebSocketHandler接口
 *
 * - websocket建立连接后执行afterConnectionEstablished回调接口
 * - websocket关闭连接后执行afterConnectionClosed回调接口
 * - websocket接收客户端消息执行handleTextMessage接口
 * - websocket传输异常时执行handleTransportError接口
 *
 * @author lucky_fd
 * @since 2024-01-17
 */

public class WebSocketHandler extends TextWebSocketHandler {

    /**
     * 存储websocket客户端连接
     * */
    private static final Map<String, WebSocketSession> connections = new HashMap<>();

    /**
     * 建立连接后触发
     * */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        System.out.println("成功建立websocket连接");
        // 建立连接后将连接以键值对方式存储,便于后期向客户端发送消息
        // 以客户端连接的唯一标识为key,可以通过客户端发送唯一标识
        connections.put(session.getRemoteAddress().getHostName(), session);
        System.out.println("当前客户端连接数:" + connections.size());
    }

    /**
     * 接收消息
     * */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        System.out.println("收到消息: " + message.getPayload());
		
		// 收到客户端请求消息后进行相应业务处理,返回结果
        this.sendMessage(session.getRemoteAddress().getHostName(),new TextMessage("收到消息: " + message.getPayload()));
    }

    /**
     * 传输异常处理
     * */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        super.handleTransportError(session, exception);
    }

    /**
     * 关闭连接时触发
     * */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        System.out.println("触发关闭websocket连接");
        // 移除连接
        connections.remove(session.getRemoteAddress().getHostName());
    }

    @Override
    public boolean supportsPartialMessages() {
        return super.supportsPartialMessages();
    }

    /**
     * 向连接的客户端发送消息
     *
     * @author lucky_fd
     * @param clientId 客户端标识
     * @param message 消息体
     **/
    public void sendMessage(String clientId, TextMessage message) {
        for (String client : connections.keySet()) {
            if (client.equals(clientId)) {
                try {
                    WebSocketSession session = connections.get(client);
                    // 判断连接是否正常
                    if (session.isOpen()) {
                        session.sendMessage(message);
                    }
                } catch (IOException e) {
                    System.out.println(e.getMessage());
                }
                break;
            }
        }
    }
}

通过消息处理器,在开发中我们就可以实现向指定客户端或所有客户端发送消息,实现相应业务功能。

创建拦截器

拦截器会在握手时触发,可以用来进行权限验证

package com.ci.erp.human.interceptor;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

import java.util.Map;

/**
 *
 * Websocket拦截器类
 *
 * @author lucky_fd
 * @since 2024-01-17
 */

public class WebSocketHandleInterceptor extends HttpSessionHandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        System.out.println("拦截器前置触发");
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
        System.out.println("拦截器后置触发");
        super.afterHandshake(request, response, wsHandler, ex);
    }
}

创建前端页面客户端

最后,创建一个简单的HTML页面,用于接收用户输入并显示实时聊天信息。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Spring WebSocket Chat</title>
    <script src="https://code.jquery.com/jquery-3.6.4.min.js"></script>
    <script src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.js"></script>
</head>
<body>

请输入:<input type="text" id="message" placeholder="Type your message">
<button onclick="sendMessage()">Send</button>
<button onclick="websocketClose()">关闭连接</button>
<div id="chat"></div>

<script>
    var socket = null;
    if ('WebSocket' in window) {
    	// 后端服务port为22900
        socket = new WebSocket("ws://localhost:22900/websocket/server");
    } else if ('MozWebSocket' in window) {
        socket = new MozWebSocket("ws://localhost:22900/websocket/server");
    } else {
        socket = new SockJS("http://localhost:22900/sockjs/server");
    }

    // 接收消息触发
    socket.onmessage = function (event) {
        showMessage(event.data);
    };
    // 创建连接触发
    socket.onopen = function (event) {
        console.log(event.type);
    };
    // 连接异常触发
    socket.onerror = function (event) {
        console.log(event)
    };
    // 关闭连接触发
    socket.onclose = function (closeEvent) {
        console.log(closeEvent.reason);
    };

    //发送消息
    function sendMessage() {
        if (socket.readyState === socket.OPEN) {
            var message = document.getElementById('message').value;
            socket.send(message);
            console.log("发送成功!");
        } else {
            console.log("连接失败!");
        }

    }

    function showMessage(message) {
        document.getElementById('chat').innerHTML += '<p>' + message + '</p>';
    }

    function websocketClose() {
        socket.close();
        console.log("连接关闭");
    }

    window.close = function () {
        socket.onclose();
    };

</script>

</body>
</html>

这个页面使用了WebSocket对象来建立连接,并通过onmessage监听收到的消息。通过输入框发送消息,将会在页面上显示。

测试结果:

后端日志:

前端界面:

Java客户端

添加依赖

<dependency>
      <groupId>org.java-websocket</groupId>
      <artifactId>Java-WebSocket</artifactId>
      <version>1.4.0</version>
</dependency>

创建客户端类(继承WebsocketClient)

package com.river.websocket;
 
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
 
import java.net.URI;
import java.net.URISyntaxException;
 
public class MyWebSocketClient extends WebSocketClient {
 
    MyWebSocketClient(String url) throws URISyntaxException {
        super(new URI(url));
    }
 	// 建立连接
    @Override
    public void onOpen(ServerHandshake shake) {
        System.out.println(shake.getHttpStatusMessage());
    }
 	// 接收消息
    @Override
    public void onMessage(String paramString) {
        System.out.println(paramString);
    }
 	// 关闭连接
    @Override
    public void onClose(int paramInt, String paramString, boolean paramBoolean) {
        System.out.println("关闭");
    }
 	// 连接异常
    @Override
    public void onError(Exception e) {
        System.out.println("发生错误");
    }
}

测试websocket

package com.river.websocket;
 
import org.java_websocket.enums.ReadyState;
 
import java.net.URISyntaxException;
 
/**
 * @author lucky_fd
 * @date 2024-1-17
 */
public class Client {
    public static void main(String[] args) throws URISyntaxException, InterruptedException {
        MyWebSocketClient client = new MyWebSocketClient("ws://localhost:22900/websocket/server");
        client.connect();
        while (client.getReadyState() != ReadyState.OPEN) {
            System.out.println("连接状态:" + client.getReadyState());
            Thread.sleep(100);
        }
        client.send("测试数据!");
        client.close();
    }
}

进阶实战应用

Websocket消息处理器

基于事件订阅机制实现消息的处理,使用Spring事件机制实现消息处理

/**
 *
 * websocket处理类
 * 实现WebSocketHandler接口
 *
 * @author lucky_fd
 * @since 2024-01-17
 */
public class WebSocketHandler extends TextWebSocketHandler {

    /**
     * 使用ConcurrentHashMap存放ws连接
     * */
    private static final Map<String, WebSocketSession> connections = new ConcurrentHashMap<>();

    /**
     * 存放当前连接ip及名称
     * */
    public static final Map<String, String> ipMap = new ConcurrentHashMap<>();

    @Autowired
    private ApplicationEventPublisher publisher;

    @Autowired
    private RedisService redisService;

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        System.out.println("建立连接");
        String host = IpUtil.getIpAddr(session);
        connections.put(host, session);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    	// 获取消息并转换为实体类
        WSRespVo respVo = JsonUtils.parseObject(message.getPayload(), WSRespVo.class);
        System.out.println("收到消息: " + respVo);

        // 发布事件消息
        publisher.publishEvent(new YYWebsocketRespEvent(respVo, session));

        // this.sendMessage(session.getRemoteAddress().getHostName(),new TextMessage("收到消息: " + message.getPayload()));
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        super.handleTransportError(session, exception);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        // 移除连接
        String host = IpUtil.getIpAddr(session);
        connections.remove(host);
        // 删除在线列表
        redisService.deleteCacheMapByKey(CacheConstants.HUMAN_YY_CLOCK_ONLINE, ipMap.get(host));

        System.out.println("连接关闭:" + host);
    }

    @Override
    public boolean supportsPartialMessages() {
        return super.supportsPartialMessages();
    }

    /**
     * 向连接的客户端发送消息
     *
     * @author lucky_fd
     * @param clientId 客户端标识
     * @param message 消息体
     **/
    public void sendMessage(String clientId, TextMessage message) {
        for (String client : WebSocketHandler.connections.keySet()) {
            if (client.equals(clientId)) {
                try {
                    WebSocketSession session = WebSocketHandler.connections.get(client);
                    // 判断连接是否正常
                    if (session.isOpen()) {
                        session.sendMessage(message);
                    }
                } catch (IOException e) {
                    System.out.println(e.getMessage());
                }
                break;
            }
        }
    }

    /**
     * 向连接的所有客户端发送消息
     *
     * @author lucky_fd
     * @param message 消息体
     * @return int 成功发送的客户端数量
     **/
    public int sendMessageEvery(TextMessage message) {
        int count = 0;
        for (WebSocketSession session : WebSocketHandler.connections.values()) {
            // 判断连接是否正常
            if (session.isOpen()) {
                try {
                    session.sendMessage(message);
                    count++;
                } catch (IOException ignored) {

                }
            }
        }
        return count;
    }
}

定义事件类

/**
 * 
 * websocket响应事件
 * @author lucky_fd
 * @since 2024-03-06
 */
@Getter
public class YYWebsocketRespEvent extends ApplicationEvent {

    /**
     * 响应指令
     * @see EmWSRespCmd
     * */
    private final String cmd;

    private final WebSocketSession session;

    public YYWebsocketRespEvent(WSRespVo source, WebSocketSession session) {
        super(source);
        this.cmd = source.getCmd();
        this.session = session;
    }
}

定义事件监听器

/**
 * websocket响应事件监听
 *
 * @author lucky_fd
 * @since 2024-03-06
 */
@Component
public class YYWebsocketRespEventListener {

    @Autowired
    private WebSocketHandler webSocketHandler; // 注入消息处理器

    @Autowired
    private RedisService redisService;

    /**
     * 客户端声明事件
     * 设备连接上 websocket 后调用该接口
     *
     * @param respEvent 事件消息
     * @author lucky_fd
     **/
    @EventListener(condition = "#respEvent.cmd == 'declare'")
    public void declare(YYWebsocketRespEvent respEvent) {
    	// 根据不同的事件实现自定义业务处理
        WSRespVo source = (WSRespVo) respEvent.getSource();
        // 博主这里是将连接的客户端存放到redis,以便后期的业务处理
        redisService.setCacheMapValue(CacheConstants.HUMAN_YY_CLOCK_ONLINE, source.getSn(), source);

        WebSocketHandler.ipMap.put(source.getIp(), source.getSn());
    }

    /**
     * 客户端心跳事件
     *
     * @param respEvent 事件消息
     * @author lucky_fd
     **/
    @EventListener(condition = "#respEvent.cmd == 'ping'")
    public void ping(YYWebsocketRespEvent respEvent) {
    	// 根据不同的事件实现自定义业务处理
        String ip = IpUtil.getIpAddr(respEvent.getSession());
        String pong = JsonUtils.toJsonString(new YYHeartbeat("pong"));
        // 发送心跳响应
        webSocketHandler.sendMessage(ip, new TextMessage(pong));
    }

    /**
     * 客户端响应指令
     *
     * @param respEvent 事件消息
     * @author lucky_fd
     **/
    @EventListener(condition = "#respEvent.cmd == 'to_device'")
    public void toDevice(YYWebsocketRespEvent respEvent) {
        WSRespVo source = (WSRespVo) respEvent.getSource();
        System.out.println("客户端响应指令: " + JsonUtils.toJsonString(source));

    }
}

Nginx转发websocket配置

server {
	
	# 日志记录格式
	log_format  main  '$remote_addr - $remote_user [$time_local] "$request" ' 
					'$status $body_bytes_sent "$http_referer" ' 
					'"$http_user_agent" "$http_x_forwarded_for"' 'upstreamIP: $upstream_addr' 		
					'upgrade: $http_upgrade';


	listen          80; # 监听端口
	server_name     localhost;

	location ^~ /WS/ {
        proxy_pass http://192.168.20.2:8080; # 代理到服务网关

		proxy_set_header Upgrade $http_upgrade; # 
		proxy_set_header Connection "upgrade";
		proxy_set_header   Host             $host;
		proxy_set_header   X-Real-IP        $remote_addr;
		proxy_set_header   X-Forwarded-For  $proxy_add_x_forwarded_for;

		access_log logs/erp/access.log main; # 记录解析日志
        error_log logs/erp/error.log; # 记录错误日志

	}
}

Gateway网关转发websocket配置

Spring Cloud Gateway的路由配置

- id: websocket
  #uri: ws://127.0.0.1:22900 #使用方式1:websocket配置,直接地址
  uri: lb:ws://ci-erp-human  #使用方式2:websocket配置,通过nacos注册中心调用serviceName
  predicates: # 断言工厂
    - Path=/WS/** # 这是一个路径断言,表示只有当请求的路径以/WS/开头时,这个路由才会被匹配。**是一个通配符,表示匹配任意子路径。
  filters: # 过滤器
    - RewritePath=/WS/(?<segment>.*),/$\{segment} # 这是一个路径重写过滤器。它将匹配到的/WS/路径下的任意子路径(由正则表达式的(?<segment>.*)捕获)重写为仅包含该子路径(由$\{segment}引用捕获的组)。例如,如果请求的路径是/WS/some/path,那么重写后的路径将是/some/path
  • Gateway转发消息体过大会导致websocket连接断开问题解决:SpringCloud Gateway转发Websocket并修改消息体大小限制

参考链接:

  • 通过注解方式实现websocket:springboot集成websocket持久连接(权限过滤+拦截)
  • spring websocket实现前后端通信
温馨提示:本文发布时间2024-01-29 11:03:00 更新于2024-01-29 11:03:00,某些文章具有时效性,若有错误或已失效,请在下方留言或联系站长
© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享
评论 抢沙发