一、该问题的重现步骤是什么?
1. 发送消息
rStreamTemplate.send(CommonRedisKey.ECANG_ORDER_UPDATE_QUEUE, CommonRedisKey.ECANG_ORDER_UPDATE_DATA_KEY, EcangOrderUpdateMsg.builder().orderId(RandomUtil.randomLong()).build());
2. 监听
@RStreamListener(name = CommonRedisKey.ECANG_ORDER_UPDATE_QUEUE, group = CommonRedisKey.ECANG_ORDER_UPDATE_GROUP, autoAcknowledge = false)
public void onMessage(Record<String, Map<String, EcangOrderUpdateMsg>> message){
log.info("ecang order update listener, receive message:{}", JSON.toJSONString(message));
final EcangOrderUpdateMsg value = message.getValue().get(CommonRedisKey.ECANG_ORDER_UPDATE_DATA_KEY);
final Boolean updated = ecangService.updateEcangOrder(value.getOrderId());
if (updated) {
// 确认消息
streamTemplate.acknowledge(CommonRedisKey.ECANG_ORDER_UPDATE_GROUP, message);
log.info("ecang order update listener update order success, orderId:{}", value.getOrderId());
}else{
log.info("ecang order update listener update order fail, orderId:{}", value.getOrderId());
}
}
二、你期待的结果是什么?实际看到的又是什么?
当更新方法返回false的时候,消息不被自动ack而是放到pending列表中
目前不管更新返回的是true还是false,消息都会被自动ack,pending列表中均看不到
三、你正在使用的是什么产品,什么版本?在什么操作系统上?
产品:BladeX-Boot
版本:3.4.0.RELEASE
操作系统: Windows Server 2022 Datacenter 21H2 / macOS Sonoma 14.5
Redis: 5.0.7
四、请提供详细的错误堆栈信息,这很重要。
五、若有更多详细信息,请在下面提供。
Redis的XREADGROUP命令有一个NOACK选项,添加NOACK选项则表示消息会自动被ACK。
Redis5.0.7的XREAD命令是没有NOACK选项的。
测试如下:
127.0.0.1:6379> XREAD COUNT 1 NOACK STREAMS ecang:order:update $
(error) ERR The NOACK option is only supported by XREADGROUP. You called XREAD instead.
通过查看框架的源码: org.springblade.core.redis.stream.RStreamListenerDetector,类的这个方法:
private void cluster(Consumer consumer, StreamOffset<String> streamOffset, RStreamListener listener, Object bean, Method method) {
boolean autoAcknowledge = listener.autoAcknowledge();
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(autoAcknowledge).build();
StreamOperations<String, Object, Object> opsForStream = redisTemplate.opsForStream();
streamMessageListenerContainer.register(readRequest, (message) -> {
// MapBackedRecord
invokeMethod(bean, method, message, listener.readRawBytes());
// ack
// 当autoAcknowledge=false时,XREADGROUP不会添加NOACK选项,
// 但是下面这个if判断又把消息给 ACK了
if (!autoAcknowledge) {
opsForStream.acknowledge(consumer.getGroup(), message);
}
});
}
所以,通过修改BladeX-Tool的源码,把上面cluster方法修改成如下即可解决问题:
private void cluster(Consumer consumer, StreamOffset<String> streamOffset, RStreamListener listener, Object bean, Method method) {
boolean autoAcknowledge = listener.autoAcknowledge();
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(autoAcknowledge).build();
StreamOperations<String, Object, Object> opsForStream = redisTemplate.opsForStream();
streamMessageListenerContainer.register(readRequest, (message) -> {
// MapBackedRecord
invokeMethod(bean, method, message, listener.readRawBytes());
// ack
// 修改if的条件
if (autoAcknowledge) {
opsForStream.acknowledge(consumer.getGroup(), message);
}
});
}
如果XREADGROUP自动ACK了消息,再调用XACK 也是没有问题的。
经测试,可以多次XACK同一条消息,但只有第一条会返回成功
127.0.0.1:6379> XACK ecang:order:update ecang:order:update:group 1718847213961-0
(integer) 1
127.0.0.1:6379> XACK ecang:order:update ecang:order:update:group 1718847213961-0
(integer) 0
127.0.0.1:6379> XACK ecang:order:update ecang:order:update:group 1718847213961-0
(integer) 0
学习Redis Stream 可以参考此篇博文:https://juejin.cn/post/7112825943231561741
感谢反馈,问题已记录
扫一扫访问 Blade技术社区 移动端