当前位置: 移动技术网 > IT编程>开发语言>Java > SpringCloud之Spring Cloud Stream:消息驱动

SpringCloud之Spring Cloud Stream:消息驱动

2019年11月24日  | 移动技术网IT编程  | 我要评论
Spring Cloud Stream 是一个构建消息驱动微服务的框架,该框架在Spring Boot的基础上整合了Spring Integrationg来连接消息代理中间件(RabbitMQ, Kafka等),提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。 应用程... ...

spring cloud stream 是一个构建消息驱动微服务的框架,该框架在spring boot的基础上整合了spring integrationg来连接消息代理中间件(rabbitmq, kafka等),提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。
应用程序通过input通道或者output通道来与spring cloud stream中binder(绑定器)交互,通过配置来binding. 而spring cloud stream的binder负责与中间件交互。

开发工具:intellij idea 2019.2.3

一、服务器端

1、创建项目

idea中创建一个新的springboot项目,名称为“spring-server”,springboot版本选择2.1.10,在选择dependencies(依赖)的界面勾选spring cloud discovery -> eureka server。
pom.xml完整内容如下:

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.1.10.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>
    <groupid>com.example</groupid>
    <artifactid>spring-server</artifactid>
    <version>0.0.1-snapshot</version>
    <name>spring-server</name>
    <description>demo project for spring boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>greenwich.sr4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-netflix-eureka-server</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencymanagement>
        <dependencies>
            <dependency>
                <groupid>org.springframework.cloud</groupid>
                <artifactid>spring-cloud-dependencies</artifactid>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencymanagement>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>

</project>

2、修改配置application.yml

修改端口号为8761;取消将自己信息注册到eureka服务器,不从eureka服务器抓取注册信息。

server:
  port: 8761
eureka:
  client:
    register-with-eureka: false
    fetch-registry: false

3、修改启动类代码

增加注解@enableeurekaserver

package com.example.springserver;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.cloud.netflix.eureka.server.enableeurekaserver;

@springbootapplication
@enableeurekaserver
public class springserverapplication {

    public static void main(string[] args) {
        springapplication.run(springserverapplication.class, args);
    }

}

二、消息生产者

1、创建项目
idea中创建一个新的springboot项目,名称为“spring-producer”,springboot版本选择2.1.10,在选择dependencies(依赖)的界面勾选web -> spring web,spring cloud discovery -> eureka discovery client。
打开pom.xml,添加依赖spring-cloud-starter-stream-rabbit,会自动引入spring-cloud-stream和spring-cloud-stream-binder。
pom.xml完整内容如下:

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.1.10.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>
    <groupid>com.example</groupid>
    <artifactid>spring-producer</artifactid>
    <version>0.0.1-snapshot</version>
    <name>spring-producer</name>
    <description>demo project for spring boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>greenwich.sr4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-netflix-eureka-client</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-stream-rabbit</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-stream-test-support</artifactid>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencymanagement>
        <dependencies>
            <dependency>
                <groupid>org.springframework.cloud</groupid>
                <artifactid>spring-cloud-dependencies</artifactid>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencymanagement>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>

</project>

2、修改配置application.yml

pom.xml使用rabbitmq,默认情况下,连接本地的5672端口。下面这段rabbitmq也可省略。

server:
  port: 8081
spring:
  application:
    name: spring-producer
eureka:
  instance:
    hostname: localhost
  client:
    serviceurl:
      defaultzone: http://localhost:8761/eureka/
rabbitmq:
  host: localhost
  post: 5672
  username: guest
  password: guest

3、编写发送服务

方法sendorder使用@output("myinput")注解表示创建myinput的消息通道。调用该方法后,会向myinput通道投递消息。
如果不使用参数myinput,则使用方法名作为通道名称。

package com.example.springproducer;

import org.springframework.cloud.stream.annotation.output;
import org.springframework.messaging.subscribablechannel;

public interface sendservice {
    @output("myinput")
    subscribablechannel sendorder();
}

4、修改启动类代码

加入注解@enablebinding以开启spring容器的绑定功能,以sendservice.class为参数,spring容器启动时,会自动绑定sendservice接口中定义的通道。

package com.example.springproducer;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.cloud.netflix.eureka.enableeurekaclient;
import org.springframework.cloud.stream.annotation.enablebinding;

@springbootapplication
@enableeurekaclient
@enablebinding(sendservice.class)
public class springproducerapplication {

    public static void main(string[] args) {
        springapplication.run(springproducerapplication.class, args);
    }

}

5、添加一个控制器类

调用sendservice的发送方法,往服务器发送消息。

package com.example.springproducer;

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.messaging.message;
import org.springframework.messaging.support.messagebuilder;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.requestmethod;
import org.springframework.web.bind.annotation.restcontroller;

@restcontroller
public class producercontroller {

    @autowired
    sendservice sendservice;

    @requestmapping(value="/send",method= requestmethod.get)
    public string sendrequest(){
        //创建消息
        message msg = messagebuilder.withpayload("hello world".getbytes()).build();
        //发送消息
        sendservice.sendorder().send(msg);
        return "success";
    }
}

三、消息消费者

1、创建项目

idea中创建一个新的springboot项目,名称为“spring-consumer”,springboot版本选择2.1.10,在选择dependencies(依赖)的界面勾选web -> spring web,spring cloud discovery -> eureka discovery client。
打开pom.xml,添加依赖

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.1.10.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>
    <groupid>com.example</groupid>
    <artifactid>spring-consumer</artifactid>
    <version>0.0.1-snapshot</version>
    <name>spring-consumer</name>
    <description>demo project for spring boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>greenwich.sr4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-netflix-eureka-client</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-stream-rabbit</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencymanagement>
        <dependencies>
            <dependency>
                <groupid>org.springframework.cloud</groupid>
                <artifactid>spring-cloud-dependencies</artifactid>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencymanagement>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>

</project>

2、修改配置application.yml

server:
  port: 8080
spring:
  application:
    name: spring-consumer
eureka:
  instance:
    hostname: localhost
  client:
    serviceurl:
      defaultzone: http://localhost:8761/eureka/
rabbitmq:
  host: localhost
  post: 5672
  username: guest
  password: guest

3、缩写接受消息的通道接口

package com.example.springconsumer;

import org.springframework.cloud.stream.annotation.input;
import org.springframework.messaging.subscribablechannel;

public interface receiveservice {
    @input("myinput")
    subscribablechannel myinput();
}

4、修改启动类代码

同样绑定消息通道

package com.example.springconsumer;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.cloud.stream.annotation.enablebinding;
import org.springframework.cloud.stream.annotation.streamlistener;

@springbootapplication
@enablebinding(receiveservice.class)
public class springconsumerapplication {

    public static void main(string[] args) {
        springapplication.run(springconsumerapplication.class, args);
    }

    //订阅myinput通道的消息
    @streamlistener("myinput")
    public void receive(byte[] msg){
        system.out.println("接收到的消息:" + new string(msg));
    }
}

5、测试

(1)检查服务里面的rabbitmq是否有启动(默认启动);

(2)启动spring-server(8761端口);

(3)启动spring-producer(8081端口);

(4)启动spring-consumer(8080端口);

(5)浏览器访问http://localhost:8081/send,spring-consumer项目的控制台输出:

接收到的消息:hello world

说明消费者已经可以从消息代理中获取到消息。

四、更换绑定器

上面使用了rabbitmq作为消息代理,如果使用kafka,可以更换maven依赖实现。
在生产者和消费者的pom.xml中,将spring-cloud-starter-stream-rabbit修改为spring-cloud-starter-stream-kafka。

如您对本文有疑问或者有任何想说的,请 点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网