当前位置: 移动技术网 > IT编程>开发语言>Java > java结合WebSphere MQ实现接收队列文件功能

java结合WebSphere MQ实现接收队列文件功能

2019年07月22日  | 移动技术网IT编程  | 我要评论

首先我们先来简单介绍下websphere mq以及安装使用简介

websphere mq  : 用于传输信息 具有跨平台的功能。

1 安装websphere mq 并启动

2 websphere mq 建立 queue manager (如:mqsi_sample_qm)

3 建立queue 类型选择 local类型 的 (如lq  )

4 建立channels 类型选择server connection (如bridgechannel)

接下来,我们来看实例代码:

mqfilereceiver.java

package com.mq.dpca.file;
 
import java.io.file;
import java.io.fileoutputstream;
 
import com.ibm.mq.mqenvironment;
import com.ibm.mq.mqexception;
import com.ibm.mq.mqgetmessageoptions;
import com.ibm.mq.mqmessage;
import com.ibm.mq.mqqueue;
import com.ibm.mq.mqqueuemanager;
import com.ibm.mq.constants.mqconstants;
import com.mq.dpca.msg.mqconfig;
import com.mq.dpca.util.readcmdline;
import com.mq.dpca.util.renameutil;
 
/**
 * 
 * mq分组接收文件功能
 * 主动轮询
 */
public class mqfilereceiver {
  private mqqueuemanager qmgr; // 连接到队列管理器
 
  private mqqueue inqueue; // 传输队列
 
  private string queuename = ""; // 队列名称
 
  private string host = ""; //
 
  private int port = 1414; // 侦听器的端口号
 
  private string channel = ""; // 通道名称
 
  private string qmgrname = ""; // 队列管理器
 
  private mqmessage inmsg; // 创建消息缓冲
 
  private mqgetmessageoptions gmo; // 设置获取消息选项
 
  private static string filename = null; // 接收队列上的消息并存入文件
 
  private int ccsid = 0;
 
  private static string file_dir = null;
 
  /**
   * 程序的入口
   * 
   * @param args
   */
  public static void main(string args[]) {
    mqfilereceiver mfs = new mqfilereceiver();
    //初始化连接
    mfs.initproperty();
    //接收文件
    mfs.rungoupreceiver();
    //获取shell脚本名
//   string shellname = mqconfig.getvaluebykey(filename);
//   if(shellname!=null&&!"".equals(shellname)){
//     //调用shell
//     readcmdline.callshell(shellname);
//   }else{
//     system.out.println("have no shell name,only receive files.");
//   }
 
  }
 
  public void rungoupreceiver() {
    try {
      init();
      getgroupmessages();
      qmgr.commit();
      system.out.println("\n messages successfully receive ");
    } catch (mqexception mqe) {
      mqe.printstacktrace();
      try {
        system.out.println("\n backing out transaction ");
        qmgr.backout();
        system.exit(2);
      } catch (exception e) {
        e.printstacktrace();
        system.exit(2);
      }
    } catch (exception e) {
      e.printstacktrace();
      system.exit(2);
    }
  }
 
  /**
   * 初始化服务器连接信息
   * 
   * @throws exception
   */
  private void init() throws exception {
    /* 为客户机连接设置mqenvironment属性 */
    mqenvironment.hostname = host;
    mqenvironment.channel = channel;
    mqenvironment.port = port;
 
    /* 连接到队列管理器 */
    qmgr = new mqqueuemanager(qmgrname);
 
    /* 设置队列打开选项以输 */
    int opnoptn = mqconstants.mqoo_input_as_q_def
        | mqconstants.mqoo_fail_if_quiescing;
 
    /* 打开队列以输 */
    inqueue = qmgr.accessqueue(queuename, opnoptn, null, null, null);
  }
 
  /**
   * 接受文件的主函数
   * 
   * @throws exception
   */
  public void getgroupmessages() {
    /* 设置获取消息选项 */
    gmo = new mqgetmessageoptions();
    gmo.options = mqconstants.mqgmo_fail_if_quiescing;
    gmo.options = gmo.options + mqconstants.mqgmo_syncpoint;
    /* 等待消息 */
    gmo.options = gmo.options + mqconstants.mqgmo_wait;
    /* 设置等待时间限制 */
    gmo.waitinterval = 5000;
    /* 只获取消息 */
    gmo.options = gmo.options + mqconstants.mqgmo_all_msgs_available;
    /* 以辑顺序获取消息 */
    gmo.options = gmo.options + mqconstants.mqgmo_logical_order;
    gmo.matchoptions = mqconstants.mqmo_match_group_id;
    /* 创建消息缓冲 */
    inmsg = new mqmessage();
    try {
      fileoutputstream fos = null;
      /* 处理组消息 */
      while (true) {
        try {
          inqueue.get(inmsg, gmo);
          if (fos == null) {
            try {
              filename = inmsg.getstringproperty("filename");
              string filename_full = null;
              filename_full = file_dir + renameutil.rename(filename);
              fos = new fileoutputstream(new file(filename_full));
              int msglength = inmsg.getmessagelength();
              byte[] buffer = new byte[msglength];
              inmsg.readfully(buffer);
              fos.write(buffer, 0, msglength);
              /* 查看是否是最后消息标识 */
              char x = gmo.groupstatus;
              if (x == mqconstants.mqgs_last_msg_in_group) {
                system.out.println("last msg in group");
                break;
              }
              inmsg.clearmessage();
 
            } catch (exception e) {
              system.out
                  .println("receiver the message without property,do nothing!");
              inmsg.clearmessage();
            }
          } else {
            int msglength = inmsg.getmessagelength();
            byte[] buffer = new byte[msglength];
            inmsg.readfully(buffer);
            fos.write(buffer, 0, msglength);
            /* 查看是否是最后消息标识 */
            char x = gmo.groupstatus;
            if (x == mqconstants.mqgs_last_msg_in_group) {
              system.out.println("last msg in group");
              break;
            }
            inmsg.clearmessage();
          }
        } catch (exception e) {
          char x = gmo.groupstatus;
          if (x == mqconstants.mqgs_last_msg_in_group) {
            system.out.println("last msg in group");
          }
          break;
        }
      }
      if (fos != null)
        fos.close();
    } catch (exception e) {
      system.out.println(e.getmessage());
    }
  }
 
  public void initproperty() {
    mqconfig config = new mqconfig().getinstance();
    if (config.getmq_manager() != null) {
      qmgrname = config.getmq_manager();
      queuename = config.getmq_queue_name();
      channel = config.getmq_channel();
      host = config.getmq_host_name();
      port = integer.valueof(config.getmq_prot());
      ccsid = integer.valueof(config.getmq_ccsid());
      file_dir = config.getfile_dir();
    }
  }
}

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网