重学SpringBoot3-集成RocketMQ(一)

重学SpringBoot3-集成RocketMQ(一)

CoderJia 20 2024-09-10

Spring Boot 3 与 RocketMQ 整合,可以通过 Spring Messaging 结合 RocketMQ 的 rocketmq-spring-boot-starter 实现。在这个整合过程中,RocketMQ 作为消息队列系统,Spring Boot 负责提供应用框架,整合可以让开发者更加便捷地使用 RocketMQ 的生产和消费功能。今天就先介绍下SpringBoot3整合RocketMQ5.x,并给出常见消息类型代码示例。

环境准备

  • Spring Boot 3.x 项目
  • RocketMQ 服务器:版本V5.3,包括 NameServerBroker,可以本地搭建或者使用云服务,搭建部分后面单独出教程。
  • RocketMQ 依赖:Spring Boot 与 RocketMQ 的整合依赖 rocketmq-spring-boot-starter

1. 配置项目依赖

在 Spring Boot 项目的 pom.xml 中添加 RocketMQ 相关依赖。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version> <!-- 或选择最新稳定版本 -->
</dependency>

2. 配置 RocketMQ 信息

application.yml 文件中配置 RocketMQ 的相关连接信息,包括 name-server 和其他基础配置。

2.1配置文件

rocketmq-spring-boot-starter 2.2.0(不含)以下版本

spring:
  rocketmq:
    name-server: localhost:9876  # NameServer 地址,集群使用';'隔开
    producer:
      group: springboot-producer-group  # 生产者组名称
      send-message-timeout: 3000
      retry-times-when-send-failed: 2
      retry-next-server: true
      access-key: RocketMQ    # 若启用了 ACL 功能
      secret-key: 12345678    # 若启用了 ACL 功能
    consumer:
      group: springboot-consumer-group  # 消费者组名称
      topic: test-topic  # 订阅的主题
      access-key: RocketMQ    # 若启用了 ACL 功能
      secret-key: 12345678    # 若启用了 ACL 功能

rocketmq-spring-boot-starter 2.2.0及其以上版本

rocketmq:
  name-server: localhost:9876  # NameServer 地址,集群使用';'隔开
  producer:
    group: springboot-producer-group  # 生产者组名称
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-next-server: true
    access-key: RocketMQ    # 若启用了 ACL 功能
    secret-key: 12345678    # 若启用了 ACL 功能
  consumer:
    group: springboot-consumer-group  # 消费者组名称
    topic: test-topic  # 订阅的主题
    access-key: RocketMQ    # 若启用了 ACL 功能
    secret-key: 12345678    # 若启用了 ACL 功能

2.2导入自动配置类

按照之前介绍的自动配置,想让 RocketMQ 配生效,需要在启动类上添加如下代码或单独写个配置类:

@Import(RocketMQAutoConfiguration.class)

否在会报错:A component required a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.

@SpringBootApplication
@Import(RocketMQAutoConfiguration.class)
public class SpringBoot308RocketmqApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBoot308RocketmqApplication.class, args);
    }
}

导入自动配置类

2.3创建Topic

示例代码仅一本地一个服务,即一个生产者和消费者,只需选一个broker,否在有些消息将无法消费。

创建Topic

3. 生产者代码示例

在 Spring Boot 项目中创建一个生产者服务,可以作为工具类,使用 RocketMQ 发送消息。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class RocketMQProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    // 发送简单消息
    public void sendMessage(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
        System.out.println("Message sent: " + message);
    }
}

3.1同步消息

同步发送消息是指,Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。RocketMQ 同步消息的方法形如 syncXx()。

    /**
     * 同步类型消息
     *
     * @param topic
     * @param message
     */
    public void sendMessage(String topic, String message) {
        rocketMQTemplate.syncSend(topic, message);
        System.out.println("Message sent: " + message);
    }

同步消息

3.2 异步消息

异步发送消息是指,Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。RocketMQ 同步消息的方法形如 asyncXx()。

/**
 * 异步类型消息
 *
 * @param topic
 * @param message
 */
public void asyncSendMessage(String topic, String message) {
    rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("Async message sent: " + message);
        }

        @Override
        public void onException(Throwable e) {
            System.out.println("Async message error: " + e);
        }
    });
    System.out.println("Message sent: " + message);
}

异步消息

3.3 单向消息

单向发送消息是指,Producer 仅负责发送消息,不等待、不处理 MQ 的 ACK。该发送方式时 MQ 也不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。

    /**
     * 发送单向消息 
     *
     * @param topic
     * @param message
     */
    public void sendOneWayMessage(String topic, String message) {
        rocketMQTemplate.sendOneWay(topic, message);
        System.out.println("One way message sent: " + message);
    }

单向消息

3.4顺序消息

顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。

    /**
     * 发送顺序消息
     */
    public void sendOrderlyMessage(String topic, String message, String shardingKey) {
        for (int i = 0; i < 10; i++) {
            String orderlyMessage = message + i;
            rocketMQTemplate.syncSendOrderly(topic, orderlyMessage, shardingKey);
            System.out.println("Orderly message sent: " + orderlyMessage + " with shardingKey: " + shardingKey);
        }
    }

顺序消息

3.5延时消息

当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级默认有18个,可以在broker.conf中增加配置,然后重启broker:

# 延时等级
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

代码很简单:

    /**
     * 发送延迟消息
     *
     * @param topic
     * @param message
     * @param delayLevel
     */
    public void sendDelayedMessage(String topic, String message, int delayLevel) {
        rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 3000, delayLevel);
        System.out.println("Delayed message sent: " + message + " with delayLevel: " + delayLevel);
    }

延时消息

除此之外,RocketMQ 还支持事务消息、批量消息、消息过滤等,后面再详细介绍。

4. 消费者代码示例

使用 @RocketMQMessageListener 注解来订阅主题并监听消息的到达,处理消息的消费逻辑。

package com.example.boot308rocketmq;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * @author CoderJia
 * @create 2024/09/09 15:12
 * @Description
 **/
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "springboot-consumer-group")
public class RocketMQConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 处理接收到的消息
        System.out.println("Received message: " + message);
    }
}

5. 调用生产者发送消息

为了便于测试,创建一个简单的 Spring Boot Controller层代码,用于调用生产者发送消息。

/*
 * Copyright 2013-2018 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.boot308rocketmq.controller;

import com.example.boot308rocketmq.RocketMQProducer;
import jakarta.annotation.Resource;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;

/**
 * @author CoderJia
 * @create 2024/9/9 下午 15:08
 * @Description
 **/
@Controller
public class MessageController {

    @Resource
    private RocketMQProducer rocketMQProducer;

    @GetMapping("/sendMessage")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        rocketMQProducer.sendOneWayMessage("test-topic", message);
        return ResponseEntity.ok("Message sent: " + message);
    }

    @GetMapping("/sendOrderlyMessage")
    public ResponseEntity<String> sendOrderlyMessage(@RequestParam String message) {
        rocketMQProducer.sendOrderlyMessage("test-topic", message, "orderKey");
        return ResponseEntity.ok("Message sent: " + message);
    }

    @GetMapping("/sendDelayedMessage")
    public ResponseEntity<String> sendDelayedMessage(@RequestParam String message, @RequestParam int delayLevel) {
        rocketMQProducer.sendDelayedMessage("test-topic", message, delayLevel);
        return ResponseEntity.ok("Delayed message sent: " + message + " with delayLevel: " + delayLevel);
    }
}

6. 启动项目并验证

  1. 启动 RocketMQ 的 NameServerBroker
  2. 启动 Spring Boot 项目。
  3. 打开浏览器或者使用 Postman 访问发送消息的接口:

普通消息

http://localhost:8080/sendMessage?message=HelloRocketMQ

顺序消息

http://localhost:8080/sendOrderlyMessage?message=HelloRocketMQ

延迟消息

http://localhost:8080/sendDelayedMessage?message=HelloDelayedRocketMQ&delayLevel=3

7. 整合总结

  • 生产者:通过 RocketMQTemplate 提供了发送消息的方法,包括同步消息、异步消息、顺序消息、延迟消息等。
  • 消费者:使用 @RocketMQMessageListener 注解,能够便捷地监听指定主题并消费消息。
  • 事务消息:RocketMQ 还支持事务消息,适合实现两阶段提交的事务模型,后面会着重介绍。

这种整合方式在 Spring Boot 3 中非常自然,并且 rocketmq-spring-boot-starter 进一步简化了配置和集成,使得开发者可以专注于业务逻辑的实现。