redis stream 消息自动确认

Blade 未结 2 275
nunchan
nunchan 2024-06-18 08:57
悬赏:5

一、该问题的重现步骤是什么?

1. 发送消息

rStreamTemplate.send(CommonRedisKey.ECANG_ORDER_UPDATE_QUEUE, CommonRedisKey.ECANG_ORDER_UPDATE_DATA_KEY, EcangOrderUpdateMsg.builder().orderId(RandomUtil.randomLong()).build());

2. 监听

@RStreamListener(name = CommonRedisKey.ECANG_ORDER_UPDATE_QUEUE, group = CommonRedisKey.ECANG_ORDER_UPDATE_GROUP, autoAcknowledge = false)
public void onMessage(Record> message){
    log.info("ecang order update listener, receive message:{}", JSON.toJSONString(message));
    final EcangOrderUpdateMsg value = message.getValue().get(CommonRedisKey.ECANG_ORDER_UPDATE_DATA_KEY);
    final Boolean updated = ecangService.updateEcangOrder(value.getOrderId());
    if (updated) {
        // 确认消息
        streamTemplate.acknowledge(CommonRedisKey.ECANG_ORDER_UPDATE_GROUP, message);
        log.info("ecang order update listener update order success, orderId:{}", value.getOrderId());
    }else{
        log.info("ecang order update listener update order fail, orderId:{}", value.getOrderId());
    }
}



二、你期待的结果是什么?实际看到的又是什么?

当更新方法返回false的时候,消息不被自动ack而是放到pending列表中

目前不管更新返回的是true还是false,消息都会被自动ack,pending列表中均看不到


三、你正在使用的是什么产品,什么版本?在什么操作系统上?

产品:BladeX-Boot

版本:3.4.0.RELEASE

操作系统: Windows Server 2022 Datacenter  21H2  /  macOS Sonoma 14.5

Redis: 5.0.7

四、请提供详细的错误堆栈信息,这很重要。


五、若有更多详细信息,请在下面提供。

2条回答
  •  nunchan
    nunchan (楼主)
    2024-06-20 10:01

    Redis的XREADGROUP命令有一个NOACK选项,添加NOACK选项则表示消息会自动被ACK。



     Redis5.0.7的XREAD命令是没有NOACK选项的。

    测试如下:

    127.0.0.1:6379> XREAD COUNT 1 NOACK STREAMS ecang:order:update $
    (error) ERR The NOACK option is only supported by XREADGROUP. You called XREAD instead.



    通过查看框架的源码: org.springblade.core.redis.stream.RStreamListenerDetector,类的这个方法:


    private void cluster(Consumer consumer, StreamOffset streamOffset, RStreamListener listener, Object bean, Method method) {
        boolean autoAcknowledge = listener.autoAcknowledge();
        StreamMessageListenerContainer.ConsumerStreamReadRequest readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(autoAcknowledge).build();
        StreamOperations opsForStream = redisTemplate.opsForStream();
        streamMessageListenerContainer.register(readRequest, (message) -> {
            // MapBackedRecord
            invokeMethod(bean, method, message, listener.readRawBytes());
            // ack
            // 当autoAcknowledge=false时,XREADGROUP不会添加NOACK选项,
            // 但是下面这个if判断又把消息给 ACK了
            if (!autoAcknowledge) {
                opsForStream.acknowledge(consumer.getGroup(), message);
            }
        });
    }



    所以,通过修改BladeX-Tool的源码,把上面cluster方法修改成如下即可解决问题:

    private void cluster(Consumer consumer, StreamOffset streamOffset, RStreamListener listener, Object bean, Method method) {
        boolean autoAcknowledge = listener.autoAcknowledge();
        StreamMessageListenerContainer.ConsumerStreamReadRequest readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(autoAcknowledge).build();
        StreamOperations opsForStream = redisTemplate.opsForStream();
        streamMessageListenerContainer.register(readRequest, (message) -> {
            // MapBackedRecord
            invokeMethod(bean, method, message, listener.readRawBytes());
            // ack
            // 修改if的条件
            if (autoAcknowledge) {
                opsForStream.acknowledge(consumer.getGroup(), message);
            }
        });
    }


    如果XREADGROUP自动ACK了消息,再调用XACK 也是没有问题的。

    经测试,可以多次XACK同一条消息,但只有第一条会返回成功

    127.0.0.1:6379> XACK ecang:order:update ecang:order:update:group 1718847213961-0
    (integer) 1
    127.0.0.1:6379> XACK ecang:order:update ecang:order:update:group 1718847213961-0
    (integer) 0
    127.0.0.1:6379> XACK ecang:order:update ecang:order:update:group 1718847213961-0
    (integer) 0


    回答: 2024-06-20 10:01

提交回复