SpringBoot + WebSocket + STOMP 实现多人聊天,以及消息推送

Java基于SpringBoot + WebSocket(STOMP) 实现的多人在线聊天客户端
查阅了各种资料,终于简单实现了理想中的模块功能
Github : https://github.com/marlkiller/SpringBoot-WebSocket

成品示例 (集成私聊)

项目文件结构如下

代码具体实现

WebSocket配置项

@Configuration
@EnableWebSocketMessageBroker
WebSocketStompConfig implements WebSocketMessageBrokerConfigurer

 @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        // WebSocket 协议
        registry.addEndpoint("/webSocketServer").addInterceptors(new HttpSessionHandshakeInterceptor() {
            @Override
            public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map map) throws Exception {
                // 绑定ip地址信息
                String ipAddress = getIpAddress(serverHttpRequest);
                map.put("ipAddress", ipAddress);
                return super.beforeHandshake(serverHttpRequest, serverHttpResponse, webSocketHandler, map);
            }

            @Override
            public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

            }
        }).setAllowedOrigins("*").withSockJS();
    }

registry添加俩个Endpoint,一个是针对Web端的WebSocket实现,一个针对客户端(Android/IOS)的实现,addInterceptors 可以自己去实现拦截过滤ip等信息

/**
     * 配置信息代理
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 订阅名称,topic 一对多,user 一对一
        registry.enableSimpleBroker("/topic", "/user");
        // 全局使用的消息前缀(客户端订阅路径上会体现出来)
        // registry.setApplicationDestinationPrefixes("/prefixes");
        // 一对一前缀,不设置的话,默认也是/user/
        registry.setUserDestinationPrefix("/user");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptorAdapter() {
            @Override
            public Message preSend(Message message, MessageChannel channel) {
                StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                //1、判断是否首次连接
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    //2、判断用户名和密码
                    String username = accessor.getNativeHeader("username").get(0);
                    String password = accessor.getNativeHeader("password").get(0);

                    if ("aaaaaa".equals(username) && "admin".equals(password)) {
                        Principal principal = new Principal() {
                            @Override
                            public String getName() {
                                return username;
                            }
                        };
                        accessor.setUser(principal);
                        return message;
                    } else {
                        return null;
                    }
                }
                //不是首次连接,已经登陆成功
                return message;
            }

        });
    }

@Controller
WebSocketAction
这个Action相当于一个Controller,处理用户手法消息以及一对一,一对多频道订阅

/**
     * 服务端接收一对多响应
     */
    @MessageMapping("/sendTopic")
    @SendTo("/topic/getResponse")
    public ServerMessage sendTopic(ClientMessage message, StompHeaderAccessor stompHeaderAccessor,Principal principal) {
        logger.info("接收到了信息" + message.getName());
        return new ServerMessage("一对多服务 响应");
    }

    /**
     * 服务端接收一对一响应
     */
    @MessageMapping("/sendUser")
    @SendToUser("/queue/getResponse")
    public ServerMessage sendUser(ClientMessage message, StompHeaderAccessor stompHeaderAccesso,Principal principal) {
        stompHeaderAccesso.getSessionAttributes();
        logger.info("接收到了信息" + message.getName());
        return new ServerMessage("一对一服务 响应");
    }


    /**
     * 一对一订阅通知
     */
    @SubscribeMapping("user/{userId}/queue/getResponse")
    public ServerMessage subOnUser(@DestinationVariable String userId, StompHeaderAccessor stompHeaderAccessor,Principal principal) {
        stompHeaderAccessor.setUser(new Principal() {
            @Override
            public String getName() {
                return userId;
            }
        });
        logger.info(userId + "/queue/getResponse 已订阅");
        return new ServerMessage("感谢你订阅了 一对一服务");
    }

    /**
     * 一对多订阅通知
     */
    @SubscribeMapping("/topic/getResponse")
    public ServerMessage subOnTopic(Principal principal) {
        logger.info("/topic/getResponse 已订阅");
        return new ServerMessage("感谢你订阅了 一对多服务");
    }

在一对一向服务器订阅的时候,保存并注册用户信息,
@DestinationVariable String userId,这个注解类似于@Controller的PathVarable
可以获取订阅方的用户ID,方便实时回复个人订阅状态信息

API配置项

@RequestMapping({"","/","page"})
    public String hello() {
        return "index";
    }

    //一对多推送消息
    @Scheduled(fixedRate = 3000)
    public void sendTopicMessage() {
        long millis = System.currentTimeMillis();
        this.messageTemplate.convertAndSend("/topic/getResponse",millis);
    }

    //一对一推送消息
    @Scheduled(fixedRate = 3000)
    public void sendQueueMessage() {
        long millis = System.currentTimeMillis();
        this.messageTemplate.convertAndSendToUser("aaaaaa","/queue/getResponse",millis);
    }

第一个方法是首页跳转html页面,关键是后俩个
我测试设置的定时执行,向Client发送消息,群发与单发,这里可以自行实现,比如推送全局广播,发送系统推送消息等.


Web客户端
依赖俩个JS

– sockjs.min.js
– stomp.min.js

<script th:src="@{/js/sockjs.min.js}"></script>
<script th:src="@{/js/stomp.min.js}"></script>
<script type="text/javascript">
    var stompClient;
    // var baseUrl = "http://localhost:8080/web-websocket/";

    // 建立连接对象(还未发起连接)
    function connect() {
        var userId = "aaaaaa";

        var socket = new SockJS(getRootPath() + "webSocketServer");

        // 获取 STOMP 子协议的客户端对象
        stompClient = Stomp.over(socket);
        // 向服务器发起websocket连接并发送CONNECT帧
        stompClient.connect(
            {
                username: userId,
                password: 'admin'
            },
            function connectCallback(frame) {
                // 连接成功时(服务器响应 CONNECTED 帧)的回调方法
                setMessageInnerHTML("连接成功");

                // 订阅 一对一
                setMessageInnerHTML("订阅一对一" + "subscribe > /user/" + userId + "/queue/getResponse");
                setMessageInnerHTML("subscribe > /user/" + userId + "/queue/getResponse");
                stompClient.subscribe('/user/' + userId + '/queue/getResponse', function (response) {
                    setMessageInnerHTML("/user/" + userId + "/message getMsg :" + response.body);
                });

                // 订阅 一对多
                setMessageInnerHTML("订阅一对多" + "subscribe > /topic/getResponse");
                setMessageInnerHTML("subscribe > /topic/getResponse");
                stompClient.subscribe('/topic/getResponse', function (response) {
                    setMessageInnerHTML("/topic/getResponse getMsg :" + response.body);
                });
            },
            function errorCallBack(error) {
                // 连接失败时(服务器响应 ERROR 帧)的回调方法
                setMessageInnerHTML("连接失败");
            }
        );

    }

    //发送消息
    function send() {
        var message = document.getElementById('text').value;
        var messageJson = JSON.stringify({"name": message});

        // 一对一 发送消息
        stompClient.send("/sendTopic", {}, messageJson);
        setMessageInnerHTML("/sendTopic 你发送的消息:" + message);

        // 一对多 发送消息
        stompClient.send("/sendUser", {}, messageJson);
        setMessageInnerHTML("/sendUser 你发送的消息:" + message);
    }


    //将消息显示在网页上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }

    function getRootPath() {
        var curWwwPath = window.document.location.href;
        var pathName = window.document.location.pathname;
        var pos = curWwwPath.indexOf(pathName);
        var localhostPaht = curWwwPath.substring(0, pos);
        var projectName = pathName.substring(0, pathName.substr(1).indexOf('/') + 1);
        return (localhostPaht + projectName + "/");
    }
</script>

Android/Java客户端

public class StompClient {

    public static void main(String[] args) throws InterruptedException {

        CountDownLatch latch = new CountDownLatch(1);

        WebSocketTransport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
        SockJsClient sockJsClient = new SockJsClient(Collections.singletonList(webSocketTransport));
        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);

        StompSessionHandler sessionHandler = new MyStompSessionHandler();
        StompHeaders headers = new StompHeaders();
        headers.add("username", "aaaaaa");
        headers.add("password", "admin");
        ListenableFuture connect = stompClient.connect("ws://localhost:8080/web/webSocketServer", new WebSocketHttpHeaders(), headers, sessionHandler);
        try {
            StompSession stompSession = connect.get();

            // 订阅
            stompSession.subscribe("/topic/getResponse", new StompFrameHandler() {
                @Override
                public Type getPayloadType(StompHeaders stompHeaders) {
                    return Object.class;
                }

                @Override
                public void handleFrame(StompHeaders stompHeaders, Object o) {
                    String msg = new String((byte[]) o);
                    System.out.println("msg = " + msg);
                }
            });

            stompSession.subscribe("/user/" + "aaaaaa"+ "/queue/getResponse", new StompFrameHandler() {
                @Override
                public Type getPayloadType(StompHeaders stompHeaders) {
                    return Object.class;
                }

                @Override
                public void handleFrame(StompHeaders stompHeaders, Object o) {
                    String msg = new String((byte[]) o);
                    System.out.println("msg = " + msg);
                }
            });

            // send 发送
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("name", "123123");
            stompSession.send("/sendTopic",jsonObject.toJSONString().getBytes());
            stompSession.send("/sendUser",jsonObject.toJSONString().getBytes());

        } catch (Exception e) {
            e.printStackTrace();
        }
        latch.await();


    }

    static class MyStompSessionHandler extends StompSessionHandlerAdapter {

        public MyStompSessionHandler() {

        }

        @Override
        public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
            System.out.println("StompHeaders: " + connectedHeaders.toString());
        }

        @Override
        public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
            System.out.println("handleException===>" + exception.getMessage());
        }

        @Override
        public void handleTransportError(StompSession session, Throwable exception) {
            System.out.println("handleTransportError===>" + exception.getMessage());
        }
    }
}

切记,Android/Java对接需要在WebSocket配置添加一个addEndpoint,这块代码实现基本与Web端类似,所以注释什么的,也懒的加了


跨域问题解决

public class CorsConfig {
    private CorsConfiguration buildConfig() {
        CorsConfiguration corsConfiguration = new CorsConfiguration();
        corsConfiguration.addAllowedOrigin("*");
        corsConfiguration.addAllowedHeader("*");
        corsConfiguration.addAllowedMethod("*");
        return corsConfiguration;
    }

    @Bean
    public CorsFilter corsFilter() {
        UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
        source.registerCorsConfiguration("/**", buildConfig());
        return new CorsFilter(source);
    }
}

voidm

在黑暗中支撑和平的无名者

1 Comment

  • 博主人很好,耐心讲解,非常感谢!

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐