Skip to content
On this page

消息中间件集成组件

使用

1. 引入依赖

xml
<dependency>
    <groupId>com.chinapost.mids</groupId>
    <artifactId>cpms-rocketmq-apache-starter</artifactId>
    <version>1.0.0-RELEASE</version>
</dependency>

2. application.yml

yaml
rocketmq:
  name-server: 127.0.0.1:xxxx
  producer:
    group: simple-producer-group
    # 发送超时时间毫秒,默认3000
    send-message-timeout: 3000

3. 生产者示例

java
@Slf4j
@RestController
@RequestMapping("/rocketMQ")
public class RocketMqSendController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 同步消息
    @GetMapping("/syncSend")
    public void syncSend() {
        for (int i = 0; i < 10; i++) {
            Message<String> bashMessage = new GenericMessage<>("test_producer" + i);
            SendResult syncSend = rocketMQTemplate.syncSend("test-topic:tagA", bashMessage);
            System.out.println(syncSend);
        }
    }
    
    // 异步消息
    @GetMapping("/asyncSend")
    public void asyncSend() {
        for (int i = 0; i < 10; i++) {
            Message<String> message = new GenericMessage<>("test_producer" + i);
            rocketMQTemplate.asyncSend("test-topic:tagA", message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("回调成功,发送的消息:" + sendResult.toString());
                }

                @Override
                public void onException(Throwable e) {
                    System.out.println("发送失败");
                }
            });
        }
    }
    
    // 单向发送消息
    @GetMapping("/sendOneWay")
    public void sendOneWay() {
        for (int i = 0; i < 100; i++) {
            Message<String> message = new GenericMessage<>("test_producer" + i);
            rocketMQTemplate.sendOneWay("test-topic:tagA", message);
            System.out.println("只发送一次");
        }
    }

    // 发送有序消息
    @GetMapping("/syncSendOrder")
    public void syncSendOrder() {
        String[] tags = new String[]{"TagA", "TagC", "TagD"};
        for (int i = 0; i < 10; i++) {
            // 加个时间前缀
            Message<String> message = new GenericMessage<>("我是顺序消息" + i);
            SendResult sendResult = rocketMQTemplate.syncSendOrderly("test-topic:" + tags[i % tags.length], message,
                    i + "");
            System.out.println(sendResult);
        }
    }

}

4. 消费者示例(创建监听)

java
@Slf4j
@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test_consumer-group", selectorExpression = "tagA")
public class DemoConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }
}