当前位置: 移动技术网 > IT编程>开发语言>Java > websocket+rabbitmq实战

websocket+rabbitmq实战

2019年04月02日  | 移动技术网IT编程  | 我要评论

异世为帝之崇祯别传,重庆经典论坛,2013146

1. websocket+rabbitmq实战

1.1. 前言

  接到的需求是后台定向给指定web登录用户推送消息,且可能同一账号会登录多个客户端都要接收到消息

1.2. 遇坑

  1. 基于springboot环境搭建的websocket+rabbitmq,搭建完成后发现websocket每隔一段时间会断开,看网上有人因为nginx的连接超时机制断开,而我这似乎是因为长连接空闲时间太长而断开
  2. 经过测试,如果一直保持每隔段时间发送消息,那么连接不会断开,所以我采用了断开重连机制,分三种情况
    1. 服务器正常,客户端正常且空闲时间不超过1分钟,则情况正常,超过一分钟会断线,前端发起请求重连
    2. 服务器正常,客户端关闭或注销,服务器正常收到通知,去除对应客户端session
    3. 服务器异常,客户端正常,客户端发现连不上服务器会尝试重连3次,3次都连不上放弃重连
  3. rabbitmq定向推送,按需求需要一台机器对应一批用户,所以定制化需要服务启动的时候定向订阅该ip对应的队列名,简单说就是动态队列名的设定,所以又复杂了点,不能直接在注解写死。同时因为使用的apollo配置中心,同一集群应该相同的配置,所以也不能通过提取配置的方式设定值,为了这个点设置apollo的集群方式有点小题大做,所以采用动态读取数据库对应的ip取出对应的队列名。
  4. 部署线上tomcat的话,不需要加上一块代码
/**
 * 使用tomcat启动无需配置
 */
//@configuration
//@conditionalonproperty(name="websocket.enabled",havingvalue = "true")
public class websocketconfig {
    @bean
    public serverendpointexporter serverendpointexporter() {
        return new serverendpointexporter();
    }
}

1.3. 正式代码

1.3.1. rabbimq部分

  1. application.properties配置
spring.rabbitmq.addresses = i.tzxylao.com:5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = 123456
spring.rabbitmq.virtual-host = /
spring.rabbitmq.connection-timeout = 15000
  1. 交换机和队列配置
/**
 * @author laoliangliang
 * @date 2019/3/29 11:41
 */
@configuration
@conditionalonproperty(name="websocket.enabled",havingvalue = "true")
public class rabbitmqconfig {

    final public static string exchangename = "websocketexchange";

    /**
     * 创建交换器
     */
    @bean
    fanoutexchange exchange() {
        return new fanoutexchange(exchangename);
    }

    @bean
    public queue queue(){
        return new queue(orderqueuename());
    }

    @bean
    binding bindingexchangemessage(queue queue,fanoutexchange exchange) {
        return bindingbuilder.bind(queue).to(exchange);
    }

    @bean
    public simplemessagelistenercontainer messagelistenercontainer(orderreceiver orderreceiver, @qualifier("rabbitconnectionfactory") cachingconnectionfactory cachingconnectionfactory){
        simplemessagelistenercontainer container = new simplemessagelistenercontainer(cachingconnectionfactory);
        // 监听队列的名称
        container.setqueuenames(orderqueuename());
        container.setexposelistenerchannel(true);
        // 设置每个消费者获取的最大消息数量
        container.setprefetchcount(100);
        // 消费者的个数
        container.setconcurrentconsumers(1);
        // 设置确认模式为自动确认
        container.setacknowledgemode(acknowledgemode.auto);
        container.setmessagelistener(orderreceiver);
        return container;
    }


    /**
     * 在这里写获取订单队列名的具体过程
     * @return
     */
    public string orderqueuename(){
        return "orderchannel";
    }
}
  1. 消息监听类
/**
 * @author laoliangliang
 * @date 2019/3/29 11:38
 */
@component
@slf4j
@conditionalonproperty(name="websocket.enabled",havingvalue = "true")
public class orderreceiver implements channelawaremessagelistener {

    @autowired
    private mywebsocket mywebsocket;

    @override
    public void onmessage(message message, channel channel) throws exception {
        byte[] body = message.getbody();
        log.info("接收到消息:" + new string(body));
        try {
            mywebsocket.sendmessage(new string(body));
        } catch (ioexception e) {
            log.error("send rabbitmq message error", e);
        }
    }
}

1.3.2. websocket部分

  1. 配置服务端点
@configuration
@conditionalonproperty(name="websocket.enabled",havingvalue = "true")
public class websocketconfig {
    @bean
    public serverendpointexporter serverendpointexporter() {
        return new serverendpointexporter();
    }
}
  1. 核心代码
/**
 * @author laoliangliang
 * @date 2019/3/28 14:40
 */
public abstract class abstractwebsocket {

    protected static map<string, copyonwritearrayset<session>> sessionstore = new hashmap<>();

    public void sendmessage(string message) throws ioexception {
        list<string> usercodes = beforesendmessage();
        for (string usercode : usercodes) {
            copyonwritearrayset<session> sessions = sessionstore.get(usercode);
            //阻塞式的(同步的)
            if (sessions !=null && sessions.size() != 0) {
                for (session s : sessions) {
                    if (s != null) {
                        s.getbasicremote().sendtext(message);
                    }
                }
            }
        }
    }

    /**
     * 删选给谁发消息
     * @return
     */
    protected abstract list<string> beforesendmessage();

    protected void clearsession(session session) {
        collection<copyonwritearrayset<session>> values = sessionstore.values();
        for (copyonwritearrayset<session> sessions : values) {
            for (session session1 : sessions) {
                if (session.equals(session1)) {
                    sessions.remove(session);
                }
            }
        }
    }
}
@serverendpoint(value = "/websocket")
@component
@conditionalonproperty(name="websocket.enabled",havingvalue = "true")
public class mywebsocket extends abstractwebsocket {


    private static logger log = logmanager.getlogger(mywebsocket.class);

    @autowired
    private amqptemplate amqptemplate;

    @postconstruct
    public void init() {
        /*scheduledexecutorservice executorservice = executors.newscheduledthreadpool(1);
        executorservice.scheduleatfixedrate(new runnable() {

            int i = 0;

            @override
            public void run() {
                amqptemplate.convertandsend(rabbitfanout.exchangename, "",("msg num : " + i).getbytes());
                i++;
            }
        }, 50, 1, timeunit.seconds);*/
    }

    /**
     * 连接建立成功调用的方法
     *
     * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @onopen
    public void onopen(session session) throws timeoutexception {
        log.info("websocket connect");
        //10m
        session.setmaxtextmessagebuffersize(10485760);
    }

    /**
     * 连接关闭调用的方法
     */
    @onclose
    public void onclose(session session) {
        clearsession(session);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     * @param session 可选的参数
     */
    @onmessage
    public void onmessage(string message, session session) {
        log.info("from client request:" + message);
        copyonwritearrayset<session> sessions = sessionstore.get(message);
        if (sessions == null) {
            sessions = new copyonwritearrayset<>();
        }
        sessions.add(session);
        sessionstore.put(message, sessions);
    }

    /**
     * 发生错误时调用
     *
     * @param session
     * @param error
     */
    @onerror
    public void onerror(session session, throwable error) {
        clearsession(session);
    }

    /**
     * 这里返回需要给哪些用户发送消息
     * @return
     */
    @override
    protected list<string> beforesendmessage() {
        //todo 给哪些用户发送消息
        return lists.newarraylist("6");
    }
}

1.3.3. 前端代码

var websocket = null;
var reconnectcount = 0;
function connectsocket(){
    var data = basicconfig();
    if(data.websocketenable !== "true"){
        return;
    }
    //判断当前浏览器是否支持websocket
    if ('websocket' in window) {
        if(data.localip && data.localip !== "" && data.serverport && data.serverport !== ""){
            websocket = new websocket("ws://"+data.localip+":"+data.serverport+data.servercontextpath+"/websocket");
        }else{
            return;
        }
    }else {
        alert('当前浏览器 不支持websocket')
    }

    //连接发生错误的回调方法
    websocket.onerror = function () {
        console.log("连接发生错误");
    };

    //连接成功建立的回调方法
    websocket.onopen = function () {
        reconnectcount = 0;
        console.log("连接成功");
    };

    //接收到消息的回调方法,此处添加处理接收消息方法,当前是将接收到的信息显示在网页上
    websocket.onmessage = function (event) {
        console.log("receive message:" + event.data);
    };

    //连接关闭的回调方法
    websocket.onclose = function () {
        console.log("连接关闭,如需登录请刷新页面。");
        if(reconnectcount === 3) {
            reconnectcount = 0;
            return;
        }
        connectsocket();
        basicconfig();
        reconnectcount++;
    };

    //添加事件监听
    websocket.addeventlistener('open', function () {
        websocket.send(data.usercode);
    });


    //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
    window.onbeforeunload = function () {
        console.log("closewebsocket");
    };


}

connectsocket();

function basicconfig(){
        var result = {};
        $.ajax({
            type: "post",
            async: false,
            url: "${request.contextpath}/basicconfig",
            data: {},
            success: function (data) {
                result = data;
            }
        });
        return result;
    }

1.3.4. 后端提供接口

    @apolloconfig
    private config config;

    @requestmapping(value = {"/basicconfig"})
    @responsebody
    public map<string, object> getusercode(httpsession session) {
        map<string, object> map = new hashmap<>(2);
        map.put("usercode",string.valueof(session.getattribute("usercode")));
        string websocketenable = config.getproperty("websocket.enabled", "false");
        string servercontextpath  = config.getproperty("server.context-path", "");
        map.put("websocketenable", websocketenable);
        map.put("servercontextpath", servercontextpath);

        string localip = config.getproperty("local.ip", "");
        string serverport = config.getproperty("server.port", "80");

        map.put("localip", localip);
        map.put("serverport", serverport);
        return map;
    }

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网