当前位置: 移动技术网 > IT编程>开发语言>Java > 消息队列

消息队列

2019年07月08日  | 移动技术网IT编程  | 我要评论

消息队列

前言:

说实话,最近还是比较忙的,手上素材倒是一大把,但是大多只是初步整理了。但是博客这种东西还是要写的,果然后面还是要放低一下排版要求(扩展性的一些东西也少提一些)。

简介:

消息队列这个东西,其实网上的资料还是很多的。我就简单说一些自己的认识与源代码哈。

演变:

我是很喜欢了解技术演进的,因为演进的过程展现了前辈们的智慧。

最早的程序串行执行就不说了。

程序调用中的方法调用,往往调用方与被调用方都存在与同一内存空间(从java角度说,都是在同一jvm中),所以方法调用的逻辑不会太复杂。简单来说,就是调用方(java中其实就是目标对象)将被调用方压入java虚拟机栈,从而执行(详见jvm)。或者等我什么时候,把我有关jvm的笔记贴出来(嘿嘿)。

后来呢,就是出现了对非本地jvm方法调用的需求(举个例子,我需要调用第三方的方法,如果每次都要双方都写一个专门的处理服务(在当时,也许接口更为准确),比较麻烦),那么就有了rpc与rmi的一个需要。那么在java中就出现了一个stub的技术,定义好后,相关方法就像调用本地一样(详见《head first java》相关章节)。当然了,这个时候已经有了中间件的概念了,所以也就有了corba等框架。谈到中间件,感兴趣的,可以去查询一下当时主流的中间件分类(如rpc,rmi,mom,tpm,orb)。

那么到了现在呢,分布式系统的通信可以按照同步与异步分为两大支柱。之所以这么理解,是因为分布式系统往往同步通信与异步通信都是需要的。简单提一下,同步通信业务逻辑相对简单,实现快速,可以实时获得回应,但耦合度较高。异步通信耦合度低,并可以进行消息堆积,消峰,但无法实时获取回应,业务逻辑复杂,从而提高系统复杂度(尤其当一条业务线与多层异步逻辑)等。之后有机会,我会举例细述。

当然了,在本篇中,只简单谈一下异步通信的主流实现-消息队列。

选择:

选择方面,我就不多说了,目前只用过rabbitmq,rocketmq,kafka。网上有关消息队列选择的文章很多,很细致,我就不赘述了。

代码实现:

这里贴出来的都是实际生产代码(如果内部版本也算的话,嘿嘿),所以如果有一些不是很熟悉的类,请查看import,是否是项目自身的类。或者也可以直接询问我。

初步实现:

这里的初步实现,是根据rabbitmq的原生方法进行编写(详细参考:《rabbitmq实战指南》第一章的两个代码清单及第二章的相关解释)。

producer:​​

package com.renewable.gateway.rabbitmq.producer;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.messageproperties;
import com.renewable.gateway.pojo.terminal;
import com.renewable.gateway.util.jsonutil;
import com.renewable.gateway.util.propertiesutil;
import org.springframework.stereotype.component;

import java.io.ioexception;
import java.util.concurrent.timeoutexception;

import static com.renewable.gateway.common.constant.rabbitmqconstant.*;

/**
 * @description:
 * @author: jarry
 */
@component("terminalproducer")
public class terminalproducer {

    private static final string ip_address = propertiesutil.getproperty(rabbitmq_host);
    private static final int port = integer.parseint(propertiesutil.getproperty(rabbitmq_port));
    private static final string user_name = propertiesutil.getproperty(rabbitmq_user_name);
    private static final string user_password = propertiesutil.getproperty(rabbitmq_user_password);

    private static final string terminal_config_terminal2centcontrol_exchange = "exchange-terminal-config-terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_queue = "queue-terminal-config-terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_routinetype = "topic";
    private static final string terminal_config_terminal2centcontrol_bindingkey = "terminal.config.terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_routingkey = "terminal.config.terminal2centcontrol";

    public static void sendterminalconfig(terminal terminal) throws ioexception, timeoutexception, interruptedexception {

        connectionfactory factory = new connectionfactory();
        factory.sethost(ip_address);
        factory.setport(port);
        factory.setusername(user_name);
        factory.setpassword(user_password);

        connection connection = factory.newconnection();
        channel channel = connection.createchannel();
        channel.exchangedeclare(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routinetype, true, false, null);
        channel.queuedeclare(terminal_config_terminal2centcontrol_queue, true, false, false, null);
        channel.queuebind(terminal_config_terminal2centcontrol_queue, terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_bindingkey);

        string terminalstr = jsonutil.obj2string(terminal);
        channel.basicpublish(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routingkey, messageproperties.persistent_text_plain, terminalstr.getbytes());

        channel.close();
        connection.close();
    }
}

consumer:

package com.renewable.gateway.rabbitmq.consumer;

import com.rabbitmq.client.*;
import com.renewable.gateway.common.guavacache;
import com.renewable.gateway.common.serverresponse;
import com.renewable.gateway.pojo.terminal;
import com.renewable.gateway.service.iterminalservice;
import com.renewable.gateway.util.jsonutil;
import com.renewable.gateway.util.propertiesutil;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;

import javax.annotation.postconstruct;
import java.io.ioexception;
import java.util.concurrent.timeoutexception;

import static com.renewable.gateway.common.constant.cacheconstant.terminal_mac;
import static com.renewable.gateway.common.constant.rabbitmqconstant.*;

/**
 * @description:
 * @author: jarry
 */
@component
public class terminalconsumer {

    @autowired
    private iterminalservice iterminalservice;

    private static final string terminal_config_centcontrol2terminal_exchange = "exchange-terminal-config-centcontrol2terminal";
    private static final string terminal_config_centcontrol2terminal_queue = "queue-terminal-config-centcontrol2terminal";
    private static final string terminal_config_centcontrol2terminal_routinetype = "topic";
    private static final string terminal_config_centcontrol2terminal_bindingkey = "terminal.config.centcontrol2terminal";

    @postconstruct
    public void messageonterminal() throws ioexception, timeoutexception, interruptedexception {
        address[] addresses = new address[]{
                new address(propertiesutil.getproperty(rabbitmq_host))
        };
        connectionfactory factory = new connectionfactory();
        factory.setusername(propertiesutil.getproperty(rabbitmq_user_name));
        factory.setpassword(propertiesutil.getproperty(rabbitmq_user_password));

        connection connection = factory.newconnection(addresses);
        final channel channel = connection.createchannel();
        channel.basicqos(64);   // 设置客户端最多接收未ack的消息个数,避免客户端被冲垮(常用于限流)
        consumer consumer = new defaultconsumer(channel) {


            @override
            public void handledelivery(string consumertag,
                                       envelope envelope,
                                       amqp.basicproperties properties,
                                       byte[] body) throws ioexception {
                // 1.接收数据,并反序列化出对象
                terminal receiveterminalconfig = jsonutil.string2obj(new string(body), terminal.class);

                // 2.验证是否是该终端的消息的消息     // 避免ack其他终端的消息
                if (receiveterminalconfig.getmac() == guavacache.getkey(terminal_mac)) {
                    // 业务代码
                    serverresponse response = iterminalservice.receiveterminalfromrabbitmq(receiveterminalconfig);
                    if (response.issuccess()) {
                        channel.basicack(envelope.getdeliverytag(), false);
                    }
                }
            }
        };
        channel.basicconsume(terminal_config_centcontrol2terminal_queue, consumer);
        // 等回调函数执行完毕后,关闭资源
        // 想了想还是不关闭资源,保持一个监听的状态,从而确保配置的实时更新
        //        timeunit.seconds.sleep(5);
        //        channel.close();
        //        connection.close();
    }
}

小结:

这是早期写的一个demo代码,是直接参照源码的。如果是学习rabbitmq的话,还是建议手写一下这种比较原始的程序,了解其中每个方法的作用,从而理解rabbitmq的思路。如果条件允许的话,还可以查看一下rabbitmq的底层通信协议-amqp(如果不方便下载,也可以私聊我)。

当然,此处可以通过@value直接导入相关配置(乃至到了springcloud后,可以通过@refreshscope等实现配置自动更新)。

与spring集成:

producer:

package com.renewable.terminal.rabbitmq.producer;

import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
import com.rabbitmq.client.messageproperties;
import com.renewable.terminal.pojo.terminal;
import com.renewable.terminal.util.jsonutil;
import org.springframework.stereotype.component;

import java.io.ioexception;
import java.util.concurrent.timeoutexception;

/**
 * @description:
 * @author: jarry
 */
@component("terminalproducer")
public class terminalproducer {

    private static string rabbitmqhost = "47.92.249.250";
    private static string rabbitmquser = "admin";
    private static string rabbitmqpassword = "123456";
    private static string rabbitmqport = "5672";

    private static final string ip_address = rabbitmqhost;
    private static final int port = integer.parseint(rabbitmqport);
    private static final string user_name = rabbitmquser;
    private static final string user_password = rabbitmqpassword;

    private static final string terminal_config_terminal2centcontrol_exchange = "exchange-terminal-config-terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_queue = "queue-terminal-config-terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_routinetype = "topic";
    private static final string terminal_config_terminal2centcontrol_bindingkey = "terminal.config.terminal2centcontrol";
    private static final string terminal_config_terminal2centcontrol_routingkey = "terminal.config.terminal2centcontrol";

    public static void sendterminalconfig(terminal terminal) throws ioexception, timeoutexception, interruptedexception {

        connectionfactory factory = new connectionfactory();
        factory.sethost(ip_address);
        factory.setport(port);
        factory.setusername(user_name);
        factory.setpassword(user_password);

        connection connection = factory.newconnection();
        channel channel = connection.createchannel();
        channel.exchangedeclare(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routinetype, true, false, null);
        channel.queuedeclare(terminal_config_terminal2centcontrol_queue, true, false, false, null);
        channel.queuebind(terminal_config_terminal2centcontrol_queue, terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_bindingkey);

        string terminalstr = jsonutil.obj2string(terminal);
        channel.basicpublish(terminal_config_terminal2centcontrol_exchange, terminal_config_terminal2centcontrol_routingkey, messageproperties.persistent_text_plain, terminalstr.getbytes());

        channel.close();
        connection.close();
    }
}

consumer:

package com.renewable.terminal.rabbitmq.consumer;

import com.rabbitmq.client.*;
import com.renewable.terminal.init.serialsensorinit;
import com.renewable.terminal.init.terminalinit;
import com.renewable.terminal.common.guavacache;
import com.renewable.terminal.common.serverresponse;
import com.renewable.terminal.pojo.terminal;
import com.renewable.terminal.service.iterminalservice;
import com.renewable.terminal.util.jsonutil;
import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.amqpheaders;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.messaging.handler.annotation.headers;
import org.springframework.messaging.handler.annotation.payload;
import org.springframework.stereotype.component;

import javax.annotation.postconstruct;
import java.io.ioexception;
import java.util.map;
import java.util.concurrent.timeoutexception;

import static com.renewable.terminal.common.constant.cacheconstant.terminal_id;
import static com.renewable.terminal.common.constant.cacheconstant.terminal_mac;

/**
 * @description:
 * @author: jarry
 */
@component
@slf4j
public class terminalconsumer {

    @autowired
    private iterminalservice iterminalservice;

    @autowired
    private serialsensorinit serialsensorinit;


    private static final string terminal_config_terminal2centcontrol_exchange = "exchange-terminal-config-centcontrol2terminal";
    private static final string terminal_config_terminal2centcontrol_queue = "queue-terminal-config-centcontrol2terminal";
    private static final string terminal_config_terminal2centcontrol_routinetype = "topic";
    private static final string terminal_config_terminal2centcontrol_bindingkey = "terminal.config.centcontrol2terminal";

    //todo_finished 2019.05.16 完成终端机terminalconfig的接收与判断(id是否为长随机数,是否需要重新分配)
    @rabbitlistener(bindings = @queuebinding(
            value = @queue(value = terminal_config_terminal2centcontrol_queue, declare = "true"),
            exchange = @exchange(value = terminal_config_terminal2centcontrol_exchange, declare = "true", type = terminal_config_terminal2centcontrol_routinetype),
            key = terminal_config_terminal2centcontrol_bindingkey
    ))
    @rabbithandler
    public void messageonterminal(@payload string terminalstr, @headers map<string, object> headers, channel channel) throws ioexception {

        terminal terminal = jsonutil.string2obj(terminalstr, terminal.class);
        if (terminal == null){
            log.info("consume the null terminal config !");
            long deliverytag = (long) headers.get(amqpheaders.delivery_tag);
            channel.basicack(deliverytag, false);
        }
        if (!guavacache.getkey(terminal_mac).equals(terminal.getmac())){
            log.info("refuse target terminal with mac({}) configure to this terminal with mac({}).",terminal.getmac(), guavacache.getkey(terminal_mac));
            return;
        }

        // 2.业务逻辑
        serverresponse response = iterminalservice.receiveterminalfromrabbitmq(terminal);
        log.info("start serialsensorinit");
        serialsensorinit.init();

        // 3.确认
        if (response.issuccess()) {
            long deliverytag = (long) headers.get(amqpheaders.delivery_tag);
            channel.basicack(deliverytag, false);
        }
    }
}

配置:

# rabbitmq 消费端配置
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 5
        max-concurrency: 10
        acknowledge-mode: manual
        # 限流
        prefetch: 1
    host: "localhost"
    port: 5672
    username: "admin"
    password: "123456"
    virtual-host: "/"
    connection-timeout: 15000

小结:

这里不得不赞一下spring,它通过提供rabbitmq地封装api-ampq,极大地简化了消息队列的代码。其实上述方法就是通过ampq的注解与yml配置来迅速实现rabbitmq的使用。

当然,这里还有很多的提升空间。比如说,通过@bean注解(建立目标配置)与公用方法提取,可以有效提高代码复用性。

简单扩展(与springstream集成):

这段代码并不是线上的代码,而是慕课网学习时留下的代码。主要实际生产中并没有使用springstream,但这确实是认识事件驱动模型的要给很好途径。

producer:

package com.imooc.order.message;

import org.springframework.cloud.stream.annotation.input;
import org.springframework.cloud.stream.annotation.output;
import org.springframework.messaging.messagechannel;
import org.springframework.messaging.subscribablechannel;

/**
 * @description:
 * @author: jarry
 */
public interface streamclient {

    string input = "mymessage";
    string input2 = "mymessageack";


    @input(streamclient.input)
    subscribablechannel input();

    @output(streamclient.input)
    messagechannel output();

    @input(streamclient.input2)
    subscribablechannel input2();

    @output(streamclient.input2)
    messagechannel output2();
}

package com.imooc.order;

import org.junit.assert;
import org.junit.test;
import org.springframework.amqp.core.amqptemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;

import java.util.date;

/**
 * @description:
 * @author: jarry
 */
@component
public class mqsendertest extends orderapplicationtests{

    @autowired
    private amqptemplate amqptemplate;

    @test
    public void send(){
        amqptemplate.convertandsend("myqueue", "now: " + new date());
        assert.assertnotnull(new date());
    }
}

consumer:

package com.imooc.order.message;

import com.imooc.order.dto.orderdto;
import lombok.extern.slf4j.slf4j;
import org.springframework.cloud.stream.annotation.enablebinding;
import org.springframework.cloud.stream.annotation.streamlistener;
import org.springframework.messaging.handler.annotation.sendto;
import org.springframework.stereotype.component;

/**
 * @description:
 * @author: jarry
 */
@component
@enablebinding(streamclient.class)
@slf4j
public class streamreceiver {

//  @streamlistener(streamclient.input)
//  public void process(object message){
//      log.info("streamreceiver: {}", message);
//  }

    @streamlistener(streamclient.input)
    // 增加以下注解,可以在input消息消费后,返回一个消息。说白了就是rabbitmq对消息消费后的确认回调函数(貌似叫这个,意思就这样,之后细查)
    @sendto(streamclient.input2)
    public string process(orderdto message){
        log.info("streamreceiver: {}", message);
        return "received.";
    }

    @streamlistener(streamclient.input2)
    public void process2(string message){
        log.info("streamreceiver2: {}", message);
    }
}

总结:

在学习技术的过程中,一方面不断地感受到自己对技术了解的不足,另一方面则是发现更重要的是系统设计中技术选型的权衡。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网