当前位置: 移动技术网 > IT编程>数据库>Mysql > Spring Cloud Alibaba(五)RocketMQ 异步通信实现

Spring Cloud Alibaba(五)RocketMQ 异步通信实现

2019年12月06日  | 移动技术网IT编程  | 我要评论
本文探讨如何使用 RocketMQ Binder 完成 Spring Cloud 应用消息的订阅和发布。 介绍 "RocketMQ" 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电 ...

本文探讨如何使用 rocketmq binder 完成 spring cloud 应用消息的订阅和发布。

介绍

rocketmq 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

rocketmq 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 apache 软件基金会,并于2017年9月25日成为 apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。

rocketmq特点

  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
  • producer、consumer、队列都可以分布式
  • producer 向一些队列轮流发送消息,队列集合称为 topic,consumer 如果做广播消费,则一个 consumer 实例消费这个 topic 对应的所有队列,如果做集群消费,则多个 consumer 实例平均消费这个 topic 对应的队列集合
  • 能够保证严格的消息顺序
  • 支持拉(pull)和推(push)两种消息模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
  • 支持多种消息协议,如 jms、openmessaging 等
  • 较少的依赖

spring cloud stream

spring cloud stream 是一个构建消息驱动微服务的框架。

spring cloud stream 提供了消息中间件配置的统一抽象,推出了 pub/sub,consumer groups,semantics,stateful partition 这些统一的模型支持。

spring cloud stream 核心构件有:binders、bindings和message,应用程序通过 inputs 或者 outputs 来与 binder 交互,通过我们配置来 binding ,而 binder 负责与中间件交互,message为数据交换的统一数据规范格式。

  • binding: 包括 input binding 和 output binding。

binding 在消息中间件与应用程序提供的 provider 和 consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 provider 或 consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

  • binder: 跟外部消息中间件集成的组件,用来创建 binding,各消息中间件都有自己的 binder 实现。

比如 kafka 的实现 kafkamessagechannelbinderrabbitmq 的实现 rabbitmessagechannelbinder 以及 rocketmq 的实现 rocketmqmessagechannelbinder

  • message:是 spring framework 中的一个模块,其作用就是统一消息的编程模型。

比如消息 messaging 对应的模型就包括一个消息体 payload 和消息头 header。

window搭建部署rocketmq

下载

下载出来解压到:d:\rocketmq 目录,目录最好不要带空格和太深,否则服务运行可能会报错

启动nameserver服务

在启动之前需要配置系统环境,不然会报错。

please set the rocketmq_home variable in your environment! 

系统环境变量名:rocketmq_home

根据你解压的目录配置环境变量,比如我的变量值为:d:\rocketmq

进入window命令窗口,进入d:\rocketmq\bin目录下,执行

start mqnamesrv.cmd

如上则nameserver启动成功。使用期间,窗口不要关闭。

启动broker服务

进入bin目录下,输入

start mqbroker.cmd -n localhost:9876

如上的 ip+port 是rocketmq的服务地址和端口。

运行如上命令,可能会报如下错误。找不到或无法加载主类

如果出此情况,打开bin-->runbroker.cmd,修改%classpath%成"%classpath%"

保存再次执行如上命令。执行成功后,提示boot success 代表成功。

示例

本示例实现三种消息的发布以及订阅接收。

创建 rocketmq 消息生产者

创建 ali-rocketmq-producer 工程,端口为:28081

  • pom.xml添加依赖
<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <parent>
        <artifactid>cloud-alibaba</artifactid>
        <groupid>com.easy</groupid>
        <version>1.0.0</version>
    </parent>

    <modelversion>4.0.0</modelversion>
    <artifactid>ali-rocketmq-producer</artifactid>
    <packaging>jar</packaging>

    <dependencies>

        <!--rocketmq依赖-->
        <dependency>
            <groupid>com.alibaba.cloud</groupid>
            <artifactid>spring-cloud-starter-stream-rocketmq</artifactid>
        </dependency>

        <!--web依赖-->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-actuator</artifactid>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>

</project>
  • 配置 output 的 binding 信息并配合 @enablebinding 注解使其生效

application.yml配置

server:
  port: 28081

spring:
  application:
    name: ali-rocketmq-producer
  cloud:
    stream:
      rocketmq:
        binder:
          # rocketmq 服务器地址
          name-server: 127.0.0.1:9876
      bindings:
        output1: {destination: test-topic1, content-type: application/json}
        output2: {destination: test-topic2, content-type: application/json}

management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

arproduceapplication.java

@springbootapplication
@enablebinding({mysource.class})
public class arproduceapplication {

    public static void main(string[] args) {
        springapplication.run(arproduceapplication.class, args);
    }
}
  • 消息生产者服务

mysource.java

package com.easy.arproduce;

import org.springframework.cloud.stream.annotation.output;
import org.springframework.messaging.messagechannel;

public interface mysource {

    @output("output1")
    messagechannel output1();

    @output("output2")
    messagechannel output2();
}

senderservice.java

package com.easy.arproduce;

import org.apache.rocketmq.spring.support.rocketmqheaders;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.messaging.message;
import org.springframework.messaging.messageheaders;
import org.springframework.messaging.support.messagebuilder;
import org.springframework.stereotype.service;
import org.springframework.util.mimetypeutils;

@service
public class senderservice {

    @autowired
    private mysource source;

    /**
     * 发送字符串
     *
     * @param msg
     */
    public void send(string msg) {
        message message = messagebuilder.withpayload(msg)
                .build();
        source.output1().send(message);
    }

    /**
     * 发送带tag的字符串
     *
     * @param msg
     * @param tag
     */
    public void sendwithtags(string msg, string tag) {
        message message = messagebuilder.withpayload(msg)
                .setheader(rocketmqheaders.tags, tag)
                .build();
        source.output1().send(message);
    }

    /**
     * 发送对象
     *
     * @param msg
     * @param tag
     * @param <t>
     */
    public <t> void sendobject(t msg, string tag) {
        message message = messagebuilder.withpayload(msg)
                .setheader(rocketmqheaders.tags, tag)
                .setheader(messageheaders.content_type, mimetypeutils.application_json)
                .build();
        source.output2().send(message);
    }
}

编写 testcontroller.java 控制器方便测试

package com.easy.arproduce;

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.requestmethod;
import org.springframework.web.bind.annotation.restcontroller;

@restcontroller
@requestmapping(value = "test")
public class testcontroller {
    @autowired
    senderservice senderservice;

    @requestmapping(value = "/send", method = requestmethod.get)
    public string send(string msg) {
        senderservice.send(msg);
        return "字符串消息发送成功!";
    }

    @requestmapping(value = "/sendwithtags", method = requestmethod.get)
    public string sendwithtags(string msg) {
        senderservice.sendwithtags(msg, "tagstr");
        return "带tag字符串消息发送成功!";
    }

    @requestmapping(value = "/sendobject", method = requestmethod.get)
    public string sendobject(int index) {
        senderservice.sendobject(new foo(index, "foo"), "tagobj");
        return "object对象消息发送成功!";
    }
}

创建 rocketmq 消息消费者

创建 ali-rocketmq-consumer 工程,端口为:28082

  • pom.xml添加依赖
<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <parent>
        <artifactid>cloud-alibaba</artifactid>
        <groupid>com.easy</groupid>
        <version>1.0.0</version>
    </parent>

    <modelversion>4.0.0</modelversion>

    <artifactid>ali-rocketmq-consumer</artifactid>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupid>com.alibaba.cloud</groupid>
            <artifactid>spring-cloud-starter-stream-rocketmq</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-actuator</artifactid>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>
</project>

-配置 input 的 binding 信息并配合 @enablebinding 注解使其生效

application.yml配置

server:
  port: 28082

spring:
  application:
    name: ali-rocketmq-consumer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876 #rocketmq 服务地址
        bindings:
          input1: {consumer.orderly: true}  #是否排序
          input2: {consumer.tags: tagstr}   #订阅 带tag值为tagstr的字符串
          input3: {consumer.tags: tagobj}   #订阅 带tag值为tabobj的字符串
      bindings:
        input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxattempts: 1}
        input2: {destination: test-topic1, content-type: application/plain, group: test-group2, consumer.maxattempts: 1}
        input3: {destination: test-topic2, content-type: application/plain, group: test-group3, consumer.maxattempts: 1}

management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

arconsumerapplication.java

package com.easy.arconsumer;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.cloud.stream.annotation.enablebinding;

@springbootapplication
@enablebinding({mysource.class})
public class arconsumerapplication {

    public static void main(string[] args) {
        springapplication.run(arconsumerapplication.class, args);
    }
}
  • 消息消费者服务

mysource.java

package com.easy.arconsumer;

import org.springframework.cloud.stream.annotation.input;
import org.springframework.messaging.subscribablechannel;

public interface mysource {
    @input("input1")
    subscribablechannel input1();

    @input("input2")
    subscribablechannel input2();

    @input("input3")
    subscribablechannel input3();
}

receiveservice.java

package com.easy.arconsumer;

import lombok.extern.slf4j.slf4j;
import org.springframework.cloud.stream.annotation.streamlistener;
import org.springframework.messaging.handler.annotation.payload;
import org.springframework.stereotype.service;

@service
@slf4j
public class receiveservice {

    @streamlistener("input1")
    public void receiveinput1(string receivemsg) {
        log.info("input1 接收到了消息:" + receivemsg);
    }

    @streamlistener("input2")
    public void receiveinput2(string receivemsg) {
        log.info("input2 接收到了消息:" + receivemsg);
    }

    @streamlistener("input3")
    public void receiveinput3(@payload foo foo) {
        log.info("input3 接收到了消息:" + foo);
    }
}

使用示例

示例关联项目

本示例我们创建了两个项目实现

  • ali-rocketmq-producer:rocketmq 消息服务生产者,服务名:ali-rocketmq-producer,端口:28081

  • ali-rocketmq-consumer:rocketmq 消息服务消费者,服务名:ali-rocketmq-producer,端口:28082

运行示例测试

首先要启动ali-rocketmq-producer服务及ali-rocketmq-consumer服务

  • 访问消息服务生产者地址: http://localhost:28081/test/send?msg=yuntian

查看服务消费者控制台,输出

2019-12-04 15:37:47.859  info 6356 --- [messagethread_1] com.easy.arconsumer.receiveservice       : input1 接收到了消息:yuntian
2019-12-04 15:37:47.859  info 6356 --- [messagethread_1] s.b.r.c.rocketmqlistenerbindingcontainer : consume c0a8096e200818b4aac212cda70e0014 cost: 1 ms

表示字符串消费成功被input1消费了

  • 访问消息服务生产者地址: http://localhost:28081/test/sendwithtags?msg=tagyuntian

查看服务消费者控制台,输出

2019-12-04 15:38:09.586  info 6356 --- [messagethread_1] com.easy.arconsumer.receiveservice       : input2 接收到了消息:tagyuntian
2019-12-04 15:38:09.592  info 6356 --- [messagethread_1] com.easy.arconsumer.receiveservice       : input1 接收到了消息:tagyuntian
2019-12-04 15:38:09.592  info 6356 --- [messagethread_1] s.b.r.c.rocketmqlistenerbindingcontainer : consume c0a8096e200818b4aac212cdfcd30015 cost: 6 ms

表示带tag的字符串成功被input2和input1消费了,因为input1也订阅了test-topic1,并且没有我们没有加tag过滤,默认表示接收所有消息,所以也能成功接收tagyuntian字符串

  • 访问消息服务生产者地址: http://localhost:28081/test/sendobject?index=1

查看服务消费者控制台,输出

2019-12-04 15:41:15.285  info 6356 --- [messagethread_1] com.easy.arconsumer.receiveservice       : input3 接收到了消息:foo{id=1, bar='foo'}

表示input3成功接收到了tag带tagobj的对象消息了,而input1却没有输出消息,这是因为sendobject发布的消息走的是test-topic2消息管道,所以不会发布给input1及input2订阅者

资料

如您对本文有疑问或者有任何想说的,请 点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网