在开发环境下,基于Springboot的RocketMQ示例(含安装步骤、错误分析)

Java基础

浏览数:26

2020-6-20

AD:资源代下载服务

在看这文章之前建议先看看先前架构原理介绍文章:

RocketMQ服务器启动

linux环境

  1. 下载编译源码

      # 下载$ 
      > wget wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-source- > 
      # 解压$
      >unzip rocketmq-all-4.7.0-source-release.zip
      > cd rocketmq-all-4.7.0/
      # 编译$
      > mvn -Prelease-all -DskipTests clean install -U
      > cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0
  2. 启动 Name Server

     # 启动 Name Server 服务
     > nohup sh bin/mqnamesrv &
     # 启动完成后,查看日志$
     > tail -f ~/logs/rocketmqlogs/namesrv.log
      The Name Server boot success...
  3. 启动 Broker

    conf 目录下,RocketMQ 提供了多种 Broker 的配置文件:

    • broker.conf :单主,异步刷盘。
    • 2m/ :双主,异步刷盘。
    • 2m-2s-async/ :两主两从,异步复制,异步刷盘。
    • 2m-2s-sync/ :两主两从,同步复制,异步刷盘。
    • dledger/ :Dledger 集群,至少三节点
     # 启动 Broker服务
     > nohup sh bin/mqbroker -n localhost:9876 &
     # 启动完成后,查看日志$
     > tail -f ~/logs/rocketmqlogs/broker.log 
      The broker[%s, 172.30.30.233:10911] boot success...

其中,参数:

    • 通过 -c 参数,配置读取的主 Broker 配置
    • 通过 -n 参数,设置 RocketMQ Namesrv 地址
    1. 关闭服务器

      > sh bin/mqshutdown broker
      The mqbroker(36695) is running...
      Send shutdown request to mqbroker(36695) OK
      
      > sh bin/mqshutdown namesrv
      The mqnamesrv(36664) is running...
      Send shutdown request to mqnamesrv(36664

    windows环境

    1. 首先去官网下载编译之后的版本,然后解压到本地目录

      官网链接:
      http://rocketmq.apache.org/do…

      下载目标:Binary: [rocketmq-all-4.7.0-bin-release.zip

    2. 配置ROCKETMQ_HOME到系统环境变量中,启动脚本将读取ROCKETMQ_HOME变量
    3. 分别进入bin目录下 启动如下脚本(需要设置内存参数,防止内存过大,启动失败,具体看<常出现的错误>小节):

      3.1 启动namesrv

      运行命令:

      mqnamesrv.cmd

      log : The Name Server boot success. serializeType=JSON

      3.2 启动brokerserver

      运行命令:

      mqbroker.cmd -n localhost:9876

      log : The broker[IQSZ-L01898, 10.111.45.111:10911] boot success. serializeType=JSON and name server is localhost:9876

    4. 关闭服务器

      mqshutdown.cmd broker

      log:killing name server

      mqshutdown.cmd namesrv

      log:killing broker

    RocketMQ发送消息和消费消息

    RocketMQ发送消息和消费消息,先启动消费者,然后再启动生产者

    添加依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.3.0</version>
    </dependency>

    发送消息

    发送消息–同步

    public class SyncProducer {
        public static void main(String[] args) throws Exception {
            //Instantiate with a producer group name.
            DefaultMQProducer producer = new
                DefaultMQProducer("test-group");
            // Specify name server addresses.
            producer.setNamesrvAddr("localhost:9876");
            //Launch the instance.
            producer.start();
            for (int i = 0; i < 100; i++) {
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                        i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //Call send message to deliver message to one of brokers.
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            //Shut down once the producer instance is not longer in use.
            producer.shutdown();
        }
    }

    发送消息–异步

    public class AsyncProducer {
        public static void main(String[] args) throws Exception {
            //Instantiate with a producer group name.
            DefaultMQProducer producer = new DefaultMQProducer("test—group");
            // Specify name server addresses.
            producer.setNamesrvAddr("localhost:9876");
            //Launch the instance.
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);
            for (int i = 0; i < 100; i++) {
                    final int index = i;
                    //Create a message instance, specifying topic, tag and message body.
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.printf("%-10d OK %s %n", index,
                                sendResult.getMsgId());
                        }
                        @Override
                        public void onException(Throwable e) {
                            System.out.printf("%-10d Exception %s %n", index, e);
                            e.printStackTrace();
                        }
                    });
            }
            //Shut down once the producer instance is not longer in use.
            producer.shutdown();
        }

    发送消息–单向

    public class OnewayProducer {
        public static void main(String[] args) throws Exception{
            //Instantiate with a producer group name.
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            // Specify name server addresses.
            producer.setNamesrvAddr("localhost:9876");
            //Launch the instance.
            producer.start();
            for (int i = 0; i < 100; i++) {
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                        i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //Call send message to deliver message to one of brokers.
                producer.sendOneway(msg);
    
            }
            //Shut down once the producer instance is not longer in use.
            producer.shutdown();
        }
    }

    消费消息

    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            // Instantiate with specified consumer group name.
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
             
            // Specify name server addresses.
            consumer.setNamesrvAddr("localhost:9876");
            
            // Subscribe one more more topics to consume.
            consumer.subscribe("TopicTest", "*");
            // Register callback to execute on arrival of messages fetched from brokers.
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            //Launch the consumer instance.
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
        }

    常出现的错误

    安装中出现的错误

    防止内存设置过大

    修改
    runbroker.cmd配置文件

    set “JAVA_OPT=%JAVA_OPT% -server -Xms500m -Xmx500m -Xmn500m”

    set “JAVA_OPT=%JAVA_OPT% -XX:MaxDirectMemorySize=1g”

    修改
    runserver.cmd配置文件

    set “JAVA_OPT=%JAVA_OPT% -server -Xms500m -Xmx500m -Xmn500m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m”

    启动NAMESERVER报错

    unrecognized vm option ‘MetasoaceSize=128m’

    解决方法:更换jdk版本为1.8即可

    启动BROKER报错

    错误: 找不到或无法加载主类 xxxxxx’

    解决方法:打开runbroker.cmd(windows),然后将‘%CLASSPATH%’加上英文双引

    使用过程中出现的错误

    No route info of this topic
    1. Broker禁止自动创建Topic,且用户没有通过手工方式创建Topic

      查看是否允许自动创建topic

      命令:mqbroker.cmd -n localhost:9876 -p

      mq开启自动创建topic参数

      命令:mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

    2. Broker 没有正确连接到 Name Server

      查看broker.log日志

      位置: /安装目录/conf/logback_broker.xml中日志位置

      日志信息:broker.log

      日志信息:namesrv.log

    3. Producer 没有正确连接到 Name Server

      linux环境:查询防火墙是否通

    错误分析方法

    日志分析法:

    1. 查看broker日志

      • 关注broker是否有注册到nameserver

        register broker to name server localhost:9876 OK

      • 关注生产者是否连接到broker

        new producer connected, group: test_group channel: ClientChannelInfo ...
      • 查看已经创建的topic是否包含自己想要的topic

        2020-04-21 15:58:22 INFO main – load exist local topic, TopicConfig [topicName=test_group, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]

      • 查看消费者是否连接到broker

        new consumer connected, group: test_group CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo …

    2. 查看nameserver日志

      • 关注broker是否注册到nameserver

        new broker registered, localhost:10911

      • 查看topic消息

        2020-04-21 17:03:32 INFO RemotingExecutorThread_1 – new topic registered, test_topic QueueData [brokerName=broker-a, readQueueNums=8, writeQueueNums=8, perm=6, topicSynFlag=0]

    各位看官还可以吗?喜欢的话,动动手指点个,点个关注呗!!谢谢支持!

    欢迎关注公众号【Ccww技术博客】,原创技术文章第一时间推出

    作者:Ccww