websocket是一种在单个tcp连接上进行全双工通信的协议 …
如果说,连接随意创建,不管的话,会存在错误,broken pipe
表面看单纯报错,并没什么功能缺陷等,但实际,请求数增加,容易导致系统奔溃。这边画重点。
出现原因有很多种,目前我这边出现的原因,是因为客户端已关闭连接,服务端还持续推送导致。
下面将使用springboot集成的websocket
导入maven
首先springboot版本
<parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>1.5.8.release</version> </parent>
集成websocket
// 加个web集成吧 <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-websocket</artifactid> </dependency>
主要用来监控客户端握手连接进来以及挥手关闭连接
需要一个管理socket的类
package com.li.manager; import lombok.extern.slf4j.slf4j; import org.springframework.web.socket.websocketsession; import java.util.concurrent.concurrenthashmap; /** * socket管理器 */ @slf4j public class socketmanager { private static concurrenthashmap<string, websocketsession> manager = new concurrenthashmap<string, websocketsession>(); public static void add(string key, websocketsession websocketsession) { log.info("新添加websocket连接 {} ", key); manager.put(key, websocketsession); } public static void remove(string key) { log.info("移除websocket连接 {} ", key); manager.remove(key); } public static websocketsession get(string key) { log.info("获取websocket连接 {}", key); return manager.get(key); } }
package com.li.factory; import com.li.manager.socketmanager; import lombok.extern.slf4j.slf4j; import org.springframework.stereotype.component; import org.springframework.web.socket.closestatus; import org.springframework.web.socket.websockethandler; import org.springframework.web.socket.websocketsession; import org.springframework.web.socket.handler.websockethandlerdecorator; import org.springframework.web.socket.handler.websockethandlerdecoratorfactory; import java.security.principal; /** * 服务端和客户端在进行握手挥手时会被执行 */ @component @slf4j public class websocketdecoratorfactory implements websockethandlerdecoratorfactory { @override public websockethandler decorate(websockethandler handler) { return new websockethandlerdecorator(handler) { @override public void afterconnectionestablished(websocketsession session) throws exception { log.info("有人连接啦 sessionid = {}", session.getid()); principal principal = session.getprincipal(); if (principal != null) { log.info("key = {} 存入", principal.getname()); // 身份校验成功,缓存socket连接 socketmanager.add(principal.getname(), session); } super.afterconnectionestablished(session); } @override public void afterconnectionclosed(websocketsession session, closestatus closestatus) throws exception { log.info("有人退出连接啦 sessionid = {}", session.getid()); principal principal = session.getprincipal(); if (principal != null) { // 身份校验成功,移除socket连接 socketmanager.remove(principal.getname()); } super.afterconnectionclosed(session, closestatus); } }; } }
getid() : 返回的是唯一的会话标识符。
getprincipal() : 经过身份验证,返回principal实例,未经过身份验证,返回null
principal: 委托人的抽象概念,可以是公司id,名字,用户唯一识别token等
package com.li.handler; import lombok.extern.slf4j.slf4j; import org.springframework.http.server.serverhttprequest; import org.springframework.http.server.servletserverhttprequest; import org.springframework.stereotype.component; import org.springframework.util.stringutils; import org.springframework.web.socket.websockethandler; import org.springframework.web.socket.server.support.defaulthandshakehandler; import javax.servlet.http.httpservletrequest; import java.security.principal; import java.util.map; /** * 我们可以通过请求信息,比如token、或者session判用户是否可以连接,这样就能够防范非法用户 */ @slf4j @component public class principalhandshakehandler extends defaulthandshakehandler { @override protected principal determineuser(serverhttprequest request, websockethandler wshandler, map<string, object> attributes) { /** * 这边可以按你的需求,如何获取唯一的值,既unicode * 得到的值,会在监听处理连接的属性中,既websocketsession.getprincipal().getname() * 也可以自己实现principal() */ if (request instanceof servletserverhttprequest) { servletserverhttprequest servletserverhttprequest = (servletserverhttprequest) request; httpservletrequest httprequest = servletserverhttprequest.getservletrequest(); /** * 这边就获取你最熟悉的陌生人,携带参数,你可以cookie,请求头,或者url携带,这边我采用url携带 */ final string token = httprequest.getparameter("token"); if (stringutils.isempty(token)) { return null; } return new principal() { @override public string getname() { return token; } }; } return null; } }
package com.li.config; import com.li.factory.websocketdecoratorfactory; import com.li.handler.principalhandshakehandler; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.configuration; import org.springframework.messaging.simp.config.messagebrokerregistry; import org.springframework.web.socket.config.annotation.abstractwebsocketmessagebrokerconfigurer; import org.springframework.web.socket.config.annotation.enablewebsocketmessagebroker; import org.springframework.web.socket.config.annotation.stompendpointregistry; import org.springframework.web.socket.config.annotation.websockettransportregistration; /** * websocketconfig配置 */ @configuration @enablewebsocketmessagebroker public class websocketconfig extends abstractwebsocketmessagebrokerconfigurer { @autowired private websocketdecoratorfactory websocketdecoratorfactory; @autowired private principalhandshakehandler principalhandshakehandler; @override public void registerstompendpoints(stompendpointregistry registry) { /** * myurl表示 你前端到时要对应url映射 */ registry.addendpoint("/myurl") .setallowedorigins("*") .sethandshakehandler(principalhandshakehandler) .withsockjs(); } @override public void configuremessagebroker(messagebrokerregistry registry) { /** * queue 点对点 * topic 广播 * user 点对点前缀 */ registry.enablesimplebroker("/queue", "/topic"); registry.setuserdestinationprefix("/user"); } @override public void configurewebsockettransport(websockettransportregistration registration) { registration.adddecoratorfactory(websocketdecoratorfactory); super.configurewebsockettransport(registration); } }
package com.li.controller; import com.li.manager.socketmanager; import lombok.extern.slf4j.slf4j; import org.springframework.beans.factory.annotation.autowired; import org.springframework.messaging.handler.annotation.messagemapping; import org.springframework.messaging.handler.annotation.sendto; import org.springframework.messaging.simp.simpmessagingtemplate; import org.springframework.web.bind.annotation.requestbody; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.bind.annotation.restcontroller; import org.springframework.web.socket.websocketsession; import java.util.map; @restcontroller @slf4j public class testcontroller { @autowired private simpmessagingtemplate template; /** * 服务器指定用户进行推送,需要前端开通 var socket = new sockjs(host+'/myurl' + '?token=1234'); */ @requestmapping("/senduser") public void senduser(string token) { log.info("token = {} ,对其发送您好", token); websocketsession websocketsession = socketmanager.get(token); if (websocketsession != null) { /** * 主要防止broken pipe */ template.convertandsendtouser(token, "/queue/senduser", "您好"); } } /** * 广播,服务器主动推给连接的客户端 */ @requestmapping("/sendtopic") public void sendtopic() { template.convertandsend("/topic/sendtopic", "大家晚上好"); } /** * 客户端发消息,服务端接收 * * @param message */ // 相当于requestmapping @messagemapping("/sendserver") public void sendserver(string message) { log.info("message:{}", message); } /** * 客户端发消息,大家都接收,相当于直播说话 * * @param message * @return */ @messagemapping("/sendalluser") @sendto("/topic/sendtopic") public string sendalluser(string message) { // 也可以采用template方式 return message; } /** * 点对点用户聊天,这边需要注意,由于前端传过来json数据,所以使用@requestbody * 这边需要前端开通var socket = new sockjs(host+'/myurl' + '?token=4567'); token为指定name * @param map */ @messagemapping("/sendmyuser") public void sendmyuser(@requestbody map<string, string> map) { log.info("map = {}", map); websocketsession websocketsession = socketmanager.get(map.get("name")); if (websocketsession != null) { log.info("sessionid = {}", websocketsession.getid()); template.convertandsendtouser(map.get("name"), "/queue/senduser", map.get("message")); } } }
可以直接启动
<!doctype html> <html> <head> <meta charset="utf-8" /> <title>spring boot websocket+广播式</title> </head> <body> <noscript> <h2 style="color:#ff0000">貌似你的浏览器不支持websocket</h2> </noscript> <div> <div> <button id="connect" onclick="connect()">连接</button> <button id="disconnect" onclick="disconnect();">断开连接</button> </div> <div id="conversationdiv"> <label>输入你的名字</label> <input type="text" id="name" /> <br> <label>输入消息</label> <input type="text" id="messgae" /> <button id="send" onclick="send();">发送</button> <p id="response"></p> </div> </div> <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script> <script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script> <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script> <script type="text/javascript"> var stompclient = null; //gateway网关的地址 var host="http://127.0.0.1:8888"; function setconnected(connected) { document.getelementbyid('connect').disabled = connected; document.getelementbyid('disconnect').disabled = !connected; document.getelementbyid('conversationdiv').style.visibility = connected ? 'visible' : 'hidden'; $('#response').html(); } // senduser *********************************************** function connect() { //地址+端点路径,构建websocket链接地址,注意,对应config配置里的addendpoint var socket = new sockjs(host+'/myurl' + '?token=4567'); stompclient = stomp.over(socket); stompclient.connect({}, function(frame) { setconnected(true); console.log('connected:' + frame); //监听的路径以及回调 stompclient.subscribe('/user/queue/senduser', function(response) { showresponse(response.body); }); }); } /* function connect() { //地址+端点路径,构建websocket链接地址,注意,对应config配置里的addendpoint var socket = new sockjs(host+'/myurl'); stompclient = stomp.over(socket); stompclient.connect({}, function(frame) { setconnected(true); console.log('connected:' + frame); //监听的路径以及回调 stompclient.subscribe('/topic/sendtopic', function(response) { showresponse(response.body); }); }); }*/ function disconnect() { if (stompclient != null) { stompclient.disconnect(); } setconnected(false); console.log("disconnected"); } function send() { var name = $('#name').val(); var message = $('#messgae').val(); /*//发送消息的路径,由客户端发送消息到服务端 stompclient.send("/sendserver", {}, message); */ /*// 发送给所有广播sendtopic的人,客户端发消息,大家都接收,相当于直播说话 注:连接需开启 /topic/sendtopic stompclient.send("/sendalluser", {}, message); */ /* 这边需要注意,需要启动不同的前端html进行测试,需要改不同token ,例如 token=1234,token=4567 * 然后可以通过写入name 为token 进行指定用户发送 */ stompclient.send("/sendmyuser", {}, json.stringify({name:name,message:message})); } function showresponse(message) { var response = $('#response'); response.html(message); } </script> </body> </html>
版权声明:本文为不会代码的小白原创文章,转载需添加小白地址 :
如对本文有疑问, 点击进行留言回复!!
浅谈Java如何实现一个基于LRU时间复杂度为O(1)的缓存
JDK1.6“新“特性Instrumentation之JavaAgent(推荐)
before社区电量是什么意思 Before社区电量获得方法
网友评论