Appearance
消息中间件集成组件
使用
1. 引入依赖
xml
<dependency>
<groupId>com.chinapost.mids</groupId>
<artifactId>cpms-rocketmq-apache-starter</artifactId>
<version>1.0.0-RELEASE</version>
</dependency>
2. application.yml
yaml
rocketmq:
name-server: 127.0.0.1:xxxx
producer:
group: simple-producer-group
# 发送超时时间毫秒,默认3000
send-message-timeout: 3000
3. 生产者示例
java
@Slf4j
@RestController
@RequestMapping("/rocketMQ")
public class RocketMqSendController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 同步消息
@GetMapping("/syncSend")
public void syncSend() {
for (int i = 0; i < 10; i++) {
Message<String> bashMessage = new GenericMessage<>("test_producer" + i);
SendResult syncSend = rocketMQTemplate.syncSend("test-topic:tagA", bashMessage);
System.out.println(syncSend);
}
}
// 异步消息
@GetMapping("/asyncSend")
public void asyncSend() {
for (int i = 0; i < 10; i++) {
Message<String> message = new GenericMessage<>("test_producer" + i);
rocketMQTemplate.asyncSend("test-topic:tagA", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("回调成功,发送的消息:" + sendResult.toString());
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败");
}
});
}
}
// 单向发送消息
@GetMapping("/sendOneWay")
public void sendOneWay() {
for (int i = 0; i < 100; i++) {
Message<String> message = new GenericMessage<>("test_producer" + i);
rocketMQTemplate.sendOneWay("test-topic:tagA", message);
System.out.println("只发送一次");
}
}
// 发送有序消息
@GetMapping("/syncSendOrder")
public void syncSendOrder() {
String[] tags = new String[]{"TagA", "TagC", "TagD"};
for (int i = 0; i < 10; i++) {
// 加个时间前缀
Message<String> message = new GenericMessage<>("我是顺序消息" + i);
SendResult sendResult = rocketMQTemplate.syncSendOrderly("test-topic:" + tags[i % tags.length], message,
i + "");
System.out.println(sendResult);
}
}
}
4. 消费者示例(创建监听)
java
@Slf4j
@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test_consumer-group", selectorExpression = "tagA")
public class DemoConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}