当前位置: 移动技术网 > IT编程>开发语言>Java > SpringBoot 对IBM MQ进行数据监听接收以及数据发送

SpringBoot 对IBM MQ进行数据监听接收以及数据发送

2019年11月27日  | 移动技术网IT编程  | 我要评论

特搜战队迪加战士,湖南电视台经济频道,宏图三胞官网

一、需求介绍

后端使用spring boot2.0框架,要实现ibm mq的实时数据jms监听接收处理,并形成回执通过mq队列发送。

二、引入依赖jar包

<dependency>
        <groupid>org.springframework</groupid>
        <artifactid>spring-jms</artifactid>
        <version>4.3.18.release</version>
</dependency>
<dependency>
        <groupid>javax.jms</groupid>
        <artifactid>javax.jms-api</artifactid>
</dependency>
<dependency>
        <groupid>com.ibm.mq</groupid>
        <artifactid>com.ibm.mq.allclient</artifactid>
        <version>9.1.0.0</version>
</dependency>

三、监听实现

代码中分为三大块:

1、mq通道连接,我这边是用的用户名密码连接,如果非密码的可不入参

2、mq的队列连接并实现监听

3、mq发送

@configuration
public class mqtestconfig {
    
    @autowired
    private mqproperties mqproperties;
    
    /**=======================mq 通道工厂============================**/
    @bean(name="mqqueueconnectionfactory")
    public mqqueueconnectionfactory mqqueueconnectionfactory(){
        mqqueueconnectionfactory mqqueueconnectionfactory = new mqqueueconnectionfactory();
        mqqueueconnectionfactory.sethostname(mqproperties.gethostname());
        try {
            mqqueueconnectionfactory.settransporttype(wmqconstants.wmq_cm_client);
            mqqueueconnectionfactory.setccsid(mqproperties.getccsid());
            mqqueueconnectionfactory.setchannel(mqproperties.getchannel());
            mqqueueconnectionfactory.setport(mqproperties.getport());
            mqqueueconnectionfactory.setqueuemanager(mqproperties.getqueuemanager());
        } catch (jmsexception e) {
            e.printstacktrace();
        }
        return mqqueueconnectionfactory;
    }
    @bean(name="usercredentialsconnectionfactoryadapter")
    public usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter(mqqueueconnectionfactory mqqueueconnectionfactory){
        usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter = new usercredentialsconnectionfactoryadapter();
        usercredentialsconnectionfactoryadapter.setusername(mqproperties.getusername());
        usercredentialsconnectionfactoryadapter.setpassword(mqproperties.getpassword());
        usercredentialsconnectionfactoryadapter.settargetconnectionfactory(mqqueueconnectionfactory);
        return usercredentialsconnectionfactoryadapter;
    }
    
    /**============================mq 消息监听接收=============================**/
    //队列连接
    @bean(name="mqueue")
    public mqqueue mqueue(){
        mqqueue mqqueue = new mqqueue();
        try {
            mqqueue.setbasequeuename(mqproperties.getbasequeuenamerecv());
            mqqueue.setbasequeuemanagername(mqproperties.getbasequeuemanagername());
        } catch (jmsexception e) {
            e.printstacktrace();
        }
        return mqqueue;
    }
    //对队列进行监听
    @bean(name="simplemessagelistenercontainer")
    public simplemessagelistenercontainer simplemessagelistenercontainer(usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter,mqqueue mqueue){
        simplemessagelistenercontainer simplemessagelistenercontainer = new simplemessagelistenercontainer();
        simplemessagelistenercontainer.setconnectionfactory(usercredentialsconnectionfactoryadapter);
        simplemessagelistenercontainer.setdestination(mqueue);
        simplemessagelistenercontainer.setmessagelistener(decmqriskrecvservice());
        return simplemessagelistenercontainer;
    }
    //报文处理类
    @bean(name="decmqriskrecvservice")
    public decmqriskrecvservice decmqriskrecvservice(){
        return new decmqriskrecvservice();
    }
    
    /**============================mq 发送消息============================**/
    @bean(name="cachingconnectionfactory")
    public cachingconnectionfactory cachingconnectionfactory(usercredentialsconnectionfactoryadapter usercredentialsconnectionfactoryadapter){
        cachingconnectionfactory cachingconnectionfactory = new cachingconnectionfactory();
        cachingconnectionfactory.settargetconnectionfactory(usercredentialsconnectionfactoryadapter);
        cachingconnectionfactory.setsessioncachesize(5);
        cachingconnectionfactory.setreconnectonexception(true);
        return cachingconnectionfactory;
    }
    @bean(name="jmstransactionmanager")
    public platformtransactionmanager jmstransactionmanager(cachingconnectionfactory cachingconnectionfactory){
        jmstransactionmanager jmstransactionmanager = new jmstransactionmanager();
        jmstransactionmanager.setconnectionfactory(cachingconnectionfactory);
        return jmstransactionmanager;
    }
    @bean(name="jmsoperations")
    public jmsoperations jmsoperations(cachingconnectionfactory cachingconnectionfactory){
        jmstemplate jmstemplate = new jmstemplate(cachingconnectionfactory);
        jmstemplate.setreceivetimeout(mqproperties.getreceivetimeout());
        return jmstemplate;
    }

}

mq配置文件

记得要添加get和set方法

@configuration
@configurationproperties(prefix=mqproperties.mq_prefix)
public class mqproperties {
    public static final string mq_prefix = "mq";
    private string hostname;
    private int port;
    private string channel;
    private int ccsid;
    private string username;
    private string password;
    private string queuemanager;
    private string basequeuemanagername;
    private string basequeuenamerecv;
    private string basequeuenamesend;
    private long receivetimeout;
    
}

报文处理类及回执发送

1、实现类要实现messagelistener,重写onmessage方法,message就是监听到的消息。

2、读取报文时为防止乱码,我这边按照格式分两种方式读取转码。

3、发送回执,之前发送发现报文多出了一些报文头信息,所以在队列信息加了

  "queue:///" + mqproperties.getbasequeuenamesend() + "?targetclient=1"

  这样发送的报文会去掉报文头信息。

@service
public class decmqriskrecvservice implements messagelistener {

    @autowired
    private jmsoperations jmsoperations;
    @autowired
    private mqproperties mqproperties;

    @override
    public void onmessage(message message) {
        string str = null;
        // 1、读取报文
        try {
            if (message instanceof bytesmessage) {
                bytesmessage bm = (bytesmessage) message;
                byte[] bys = null;
                bys = new byte[(int) bm.getbodylength()];
                bm.readbytes(bys);
                str = new string(bys, "utf-8");
            } else {
                str = ((textmessage) message).gettext();
                str = new string(str.getbytes("iso-8859-1"), "utf-8");
            }
        } catch (jmsexception e) {
            e.printstacktrace();
        } catch (unsupportedencodingexception e) {
            e.printstacktrace();
        }

        // 2、处理报文

        // 3、组装回执发送
        string receipt = "";
        try {
            jmsoperations.convertandsend("queue:///" + mqproperties.getbasequeuenamesend() + "?targetclient=1", receipt.getbytes("utf-8"));
        } catch (jmsexception e) {
            e.printstacktrace();
        } catch (unsupportedencodingexception e) {
            e.printstacktrace();
        }
    }

}

 

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网