文章目录
- 一、背景
- 二、环境准备
- 三、项目使用
- 1、业务需求:
- 2、实现原理
- 3、代码实现
- 3.1、生产端——消息发送
- 3.2、消费端——延迟消息监听
- 四、保证消息消费成功
一、背景
避免用户未付款订单占用库存资源。
二、环境准备
- 下载安装RocketMQ
- SpringBoot整合RocketMQ——rocketmq-spring-boot-starter
三、项目使用
1、业务需求:
订单超时30分钟,取消订单。
2、实现原理
利用RocketMQ的延迟消息实现。
rocketMQTemplate.syncSend(主题, 消息, 超时时间, 延迟等级);
RocketMQ默认提供了18个延迟等级,延迟30分钟是等级16。
延迟等级 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
延迟时间 | 0 | 1s | 5s | 10s | 30s | 1m | 2m | 3m | 4m | 5m | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
3、代码实现
3.1、生产端——消息发送
@Service
public class Server {private RocketMQTemplate rocketMQTemplate;public Server(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate = rocketMQTemplate;}/***发送延迟订单消息**/public void sendDelayOrderMsg(String msg) {Message<String> msgBody = MessageBuilder.withPayload(msg).build();rocketMQTemplate.syncSend("delay_order", msgBody , 2000, 16);}
}
3.2、消费端——延迟消息监听
@Component
@RocketMQMessageListener(topic = "delay_order",consumerGroup = "order-consumer", selectorExpression = "*")
public class OrderListner implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {System.out.println("接收到的延迟消息:"+msg);//todo 自定义义务处理。//逻辑删除该订单数据}
}
四、保证消息消费成功
rocketmq-spring-boot-starter在监听消息就实现了自动提交ack。
RocketMQListener的onMessage方法不抛异常都会自动提交ack。
抛出异常则进行重试。