当前位置: 移动技术网 > IT编程>开发语言>Java > RocketMQ初入门踩坑记

RocketMQ初入门踩坑记

2019年10月07日  | 移动技术网IT编程  | 我要评论

青北外院,等你爱我简谱,红酒木瓜靓汤唯一正品官网

本文主要是讲在centos中安装rocketmq并做简单的示例。如果你按照本文安装100%是可以成功的,如果按照阿里官方的说明,那只能呵呵了~

安装

官方地址为:https://rocketmq.apache.org/docs/quick-start/
本人安装如下:

//下载最新的rocketmq
wget http://apache-mirror.8birdsvideo.com/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip

//解压
unzip rocketmq-all-4.4.0-bin-release.zip

//切换到mq目录
cd rocketmq-all-4.4.0-bin-release

//name server 启动
nohup ./bin/mqnamesrv -n 111.231.xx.xx:9876 &

//-c conf/broker.conf autocreatetopicenable=true 参数需要带上,不然topic需要手动创建
nohup sh bin/mqbroker -n 111.231.xx.xx:9876 -c conf/broker.conf autocreatetopicenable=true &

配置,切换到mq的bin目录下

cd rocketmq-all-4.4.0-bin-release/bin

rocketmq默认最低内存为4g,机器内存不够用的话,找到runserver.sh和runbroker.sh编辑如下:

java_opt="${java_opt} -server -xms256m -xmx256m -xmn125m -xx:metaspacesize=128m -xx:maxmetaspacesize=320m"

运行

运行官方demo,发现如下错误:

21:20:22.249 [nettyclientselector_1] info  rocketmqremoting - closechannel: close the connection to remote address[] result: true
org.apache.rocketmq.remoting.exception.remotingtoomuchrequestexception: senddefaultimpl call timeout
    at org.apache.rocketmq.client.impl.producer.defaultmqproducerimpl.senddefaultimpl(defaultmqproducerimpl.java:640)
    at org.apache.rocketmq.client.impl.producer.defaultmqproducerimpl.send(defaultmqproducerimpl.java:1310)
    at org.apache.rocketmq.client.impl.producer.defaultmqproducerimpl.send(defaultmqproducerimpl.java:1256)
    at org.apache.rocketmq.client.producer.defaultmqproducer.send(defaultmqproducer.java:339)
    at org.apache.rocketmq.example.simple.producer.main(producer.java:40)

运行以下命令查看broker配置并写入远程ip地址:

//查看broker配置
sh ./bin/mqbroker -m

//关闭broker
sh bin/mqshutdown broker

//将本机远程ip写入配置文件中
echo 'brokerip1=111.231.xx.xx' > conf/broker.properties 

//重新启动broker
nohup sh bin/mqbroker -n 111.231.xx.xx:9876 -c conf/broker.conf autocreatetopicenable=true &

管理控制台安装

git地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

git clone git@github.com:apache/rocketmq-externals.git
cd  rocketmq-external/rocketmq-console/
mvn clean package -dmaven.test.skip=true

打完包后,运行以下命令

java -jar rocketmq-console-ng-1.0.1.jar --server.port=12181 --rocketmq.config.namesrvaddr=111.231.xx.xx:9876

打开 http://www.lhsxpumps.com/_localhost:12181访问控制台,像如下

在procuder这个页面查询时会出现如下异常:

java.lang.runtimeexception: org.apache.rocketmq.client.exception.mqbrokerexception: code: 1  desc: the producer group[] not exist
for more information, please visit the url, http://rocketmq.apache.org/docs/faq/
        at com.google.common.base.throwables.propagate(throwables.java:160)
        at org.apache.rocketmq.console.service.impl.producerserviceimpl.getproducerconnection(producerserviceimpl.java:38)
        at org.apache.rocketmq.console.controller.producercontroller.producerconnection(producercontroller.java:39)

请把代码中producer.shutdown()这句注掉,生产环境中请加上。

 //producer.shutdown();

代码示例(官方)

生产者

package org.apache.rocketmq.example.simple;

import org.apache.rocketmq.client.exception.mqclientexception;
import org.apache.rocketmq.client.producer.defaultmqproducer;
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.common.message.message;
import org.apache.rocketmq.remoting.common.remotinghelper;

public class producer {
    public static void main(string[] args) throws mqclientexception, interruptedexception {

        defaultmqproducer producer = new defaultmqproducer("producergroupname");

        producer.setnamesrvaddr("111.231.xx.xx:9876");
        producer.start();

        for (int i = 0; i < 10; i++)
            try {
                {
                    message msg = new message("topictest",
                        "taga",
                        "orderid188",
                        "hello world".getbytes(remotinghelper.default_charset));
                    sendresult sendresult = producer.send(msg);
                    system.out.printf("%s%n", sendresult);
                }

            } catch (exception e) {
                e.printstacktrace();
            }

        //producer.shutdown();
    }
}

消费者

package org.apache.rocketmq.example.simple;

import java.util.list;
import org.apache.rocketmq.client.consumer.defaultmqpushconsumer;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlycontext;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus;
import org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently;
import org.apache.rocketmq.client.exception.mqclientexception;
import org.apache.rocketmq.common.consumer.consumefromwhere;
import org.apache.rocketmq.common.message.messageext;

public class pushconsumer {

    public static void main(string[] args) throws interruptedexception, mqclientexception {
        defaultmqpushconsumer consumer = new defaultmqpushconsumer("cid_jodie_1");
        consumer.subscribe("topictest", "*");
        consumer.setconsumefromwhere(consumefromwhere.consume_from_first_offset);
        //wrong time format 2017_0422_221800
        //consumer.setconsumetimestamp("20181109221800");
        consumer.setnamesrvaddr("111.231.xx.xx:9876");
        consumer.registermessagelistener(new messagelistenerconcurrently() {

            @override
            public consumeconcurrentlystatus consumemessage(list<messageext> msgs, consumeconcurrentlycontext context) {
                system.out.printf("%s receive new messages: %s %n", thread.currentthread().getname(), msgs);
                return consumeconcurrentlystatus.consume_success;
            }
        });
        consumer.start();
        system.out.printf("consumer started.%n");
    }
}

有更多的文章,请关注查看,更有面试宝典相送
image

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网