当前位置: 移动技术网 > IT编程>数据库>MSSQL > Spring boot+redis实现消息发布与订阅

Spring boot+redis实现消息发布与订阅

2020年04月25日  | 移动技术网IT编程  | 我要评论

金健米业招聘,虎皮兰,中央4台在线直播

一.创建spring boot项目

<dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-data-redis</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>com.alibaba</groupid>
            <artifactid>fastjson</artifactid>
            <version>1.2.41</version>
        </dependency>

二.编辑yml配置文件

server:
  port: 7888
# 日志配置
logging:
  config: classpath:log/logback.xml
  level:
    cn.com.dhcc: info
    org.springframework: info
    org.springframework.web: info
    com.alibaba.nacos.client.naming: error
spring:
  redis:
     host: localhost
     port: 6379
     password: *********
     database: 1
     jedis:
      pool:
        max-idle: 8
        max-active: 8
        max-wait: -1
        min-idle: 0
     timeout: 5000

三.配置redis

@configuration
public class redisconfiguration {

    /**
     * 实例化 redistemplate 对象
     *
     * @return
     */
    @bean("redistemplates")
    public redistemplate<string, object> functiondomainredistemplate(redisconnectionfactory redisconnectionfactory) {
        redistemplate<string, object> redistemplate = new redistemplate<>();
        initdomainredistemplate(redistemplate, redisconnectionfactory);
        return redistemplate;
    }

    /**
     * 设置数据存入 redis 的序列化方式,并开启事务
     * 
     * @param redistemplate
     * @param factory
     */
    private void initdomainredistemplate(@qualifier("redistemplates") redistemplate<string, object> redistemplate, redisconnectionfactory factory) {
        // 如果不配置serializer,那么存储的时候缺省使用string,如果用user类型存储,那么会提示错误user can't cast to
        // string!
        redistemplate.setkeyserializer(new stringredisserializer());
        redistemplate.sethashkeyserializer(new stringredisserializer());

        fastjsonredisserializer<object> fastjsonredisserializer = new fastjsonredisserializer<object>(object.class);
        redistemplate.sethashvalueserializer(fastjsonredisserializer);
        redistemplate.setvalueserializer(fastjsonredisserializer);
        //redistemplate.sethashvalueserializer(new genericjackson2jsonredisserializer());
        //redistemplate.setvalueserializer(new genericjackson2jsonredisserializer());
        // 开启事务
        redistemplate.setenabletransactionsupport(true);
        redistemplate.setconnectionfactory(factory);
    }

    /**
     * 注入封装redistemplate @title: redisutil @return redisutil @date
     * 
     */
    @bean(name = "redisutils")
    public redisutils redisutil(@qualifier("redistemplates") redistemplate<string, object> redistemplate) {
        redisutils redisutil = new redisutils();
        redisutil.setredistemplate(redistemplate);
        return redisutil;
    }

四.编写redisutil消息发布方法

public class redisutils {
    private static final logger log = loggerfactory.getlogger(redisutils.class);

    private redistemplate<string, object> redistemplate;

    public void setredistemplate(redistemplate<string, object> redistemplate) {
        this.redistemplate = redistemplate;
    }

    public void publish(string channal ,object obj) {
        redistemplate.convertandsend(channal,obj );
    }
}

五.配置消息监听

@configuration
public class redismessagelistener {

    /**
     * 创建连接工厂
     * @param connectionfactory
     * @param listeneradapter
     * @return
     */
    @bean
    public redismessagelistenercontainer container(redisconnectionfactory connectionfactory,
                                                   messagelisteneradapter listeneradapter,messagelisteneradapter listeneradapter2){
        redismessagelistenercontainer container = new redismessagelistenercontainer();
        container.setconnectionfactory(connectionfactory);
        //接受消息的key
        container.addmessagelistener(listeneradapter,new patterntopic("phone"));
        return container;
    }

    /**
     * 绑定消息监听者和接收监听的方法
     * @param receiver
     * @return
     */
    @bean
    public messagelisteneradapter listeneradapter(receiverredismessage  receiver){
        return new messagelisteneradapter(receiver,"receivemessage");
    }

    /**
     * 注册订阅者
     * @param latch
     * @return
     */
    @bean
    receiverredismessage receiver(countdownlatch latch) {
        return new receiverredismessage(latch);
    }

    /**
     * 计数器,用来控制线程
     * @return
     */
    @bean
    public countdownlatch latch(){
        return new countdownlatch(1);//指定了计数的次数 1
    }
}

六.消息订阅方法

public class receiverredismessage {

    private static final logger log = loggerfactory.getlogger(receiverredismessage.class);
    private countdownlatch latch;

    @autowired
    public receiverredismessage(countdownlatch latch) {
        this.latch = latch;
    }

    /**
     * 队列消息接收方法
     *
     * @param jsonmsg
     */
    public void receivemessage(string jsonmsg) {
        log.info("[开始消费redis消息队列phone数据...]");
        try {
            log.info("监听者收到消息:{}", jsonmsg);
            jsonobject exjson = jsonobject.parseobject(jsonmsg);
            user user = json.tojavaobject(exjson, user.class);
            system.out.println("转化为对象 :"+user);
            log.info("[消费redis消息队列phone数据成功.]");
        } catch (exception e) {
            log.error("[消费redis消息队列phone数据失败,失败信息:{}]", e.getmessage());
        }
        latch.countdown();
    }
}

七.定时消息发布测试

@enablescheduling
@component
public class publishercontroller {

    private static final logger log = loggerfactory.getlogger(publishercontroller.class);

    @autowired
    private redisutils redisutils;

    @scheduled(fixedrate = 5000)
    public string pubmsg() {
        user user=new user(1, "尚***", 26,"男","陕西省xxxx市xxxxxx县");
        redisutils.publish("phone", user);
        log.info("publisher sendes topic... ");
        return "success";
    }
}

八.测试结果
spring boot+redis实现消息发布与订阅

九.发布对象user实体

public class user implements serializable {

    /**
     * 
     */
    private static final long serialversionuid = 1l;
    private int id;
    private string name;
    private int age;
    private string sex;
    private string address;
     .....................
}

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网