redis stream 消息自动确认

Blade 未结 2 277
nunchan
nunchan 2024-06-18 08:57
悬赏:5

一、该问题的重现步骤是什么?

1. 发送消息

rStreamTemplate.send(CommonRedisKey.ECANG_ORDER_UPDATE_QUEUE, CommonRedisKey.ECANG_ORDER_UPDATE_DATA_KEY, EcangOrderUpdateMsg.builder().orderId(RandomUtil.randomLong()).build());

2. 监听

@RStreamListener(name = CommonRedisKey.ECANG_ORDER_UPDATE_QUEUE, group = CommonRedisKey.ECANG_ORDER_UPDATE_GROUP, autoAcknowledge = false)
public void onMessage(Record> message){
    log.info("ecang order update listener, receive message:{}", JSON.toJSONString(message));
    final EcangOrderUpdateMsg value = message.getValue().get(CommonRedisKey.ECANG_ORDER_UPDATE_DATA_KEY);
    final Boolean updated = ecangService.updateEcangOrder(value.getOrderId());
    if (updated) {
        // 确认消息
        streamTemplate.acknowledge(CommonRedisKey.ECANG_ORDER_UPDATE_GROUP, message);
        log.info("ecang order update listener update order success, orderId:{}", value.getOrderId());
    }else{
        log.info("ecang order update listener update order fail, orderId:{}", value.getOrderId());
    }
}



二、你期待的结果是什么?实际看到的又是什么?

当更新方法返回false的时候,消息不被自动ack而是放到pending列表中

目前不管更新返回的是true还是false,消息都会被自动ack,pending列表中均看不到


三、你正在使用的是什么产品,什么版本?在什么操作系统上?

产品:BladeX-Boot

版本:3.4.0.RELEASE

操作系统: Windows Server 2022 Datacenter  21H2  /  macOS Sonoma 14.5

Redis: 5.0.7

四、请提供详细的错误堆栈信息,这很重要。


五、若有更多详细信息,请在下面提供。

2条回答
提交回复