我使用RocketMQMessageListener类中获取调用feign一直相应fallback回调

Blade 未结 2 336
zdd
zdd 2024-03-06 10:12

一、该问题的重现步骤是什么?

 1. RocketMQConsumer

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consume_group_pc_industry_2",
   topic = RocketMqConstant.TOPIC_ENTERPRISE_RISK_DISTRIBUTION_MAP_INBOUND)
public class EnterpriseRiskDistributionMapConsumer implements RocketMQListener {

   @Resource
   private RedisTemplate, Object> redisTemplate; // redisTemplate

   @Resource
   private SimpleSyncMessageProducer simpleSyncMessageProducer;

   @Resource
   private IDictClient dictClient;


R value = dictClient.getValue(DictEnum.DATA_OPERATE_TYPE.getName(), inbound.getDataOperate());

此处获取的到的feign直接返回的是 IDictClientFallback 中的失败回调

2. 但是我在用项目 同controller中可以获取到字典值


二、你期待的结果是什么?实际看到的又是什么?
我期望可以获取到字典的值

实际是错误回调

image.png


三、你正在使用的是什么产品,什么版本?在什么操作系统上?

bladex 3.0.1 windows


四、请提供详细的错误堆栈信息,这很重要。


五、若有更多详细信息,请在下面提供。

类中的全部代码

package org.springblade.prevention.industry.rmq.rocketmq;

import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springblade.core.tool.api.R;
import org.springblade.core.tool.utils.BeanUtil;
import org.springblade.core.tool.utils.Func;
import org.springblade.prevention.industry.rmq.cache.DeptCodeMapCache;
import org.springblade.prevention.industry.rmq.constant.RedisKeyConstant;
import org.springblade.prevention.industry.rmq.constant.RocketMqConstant;
import org.springblade.prevention.industry.rmq.entity.EnterpriseRiskDistributeMap;
import org.springblade.prevention.industry.rmq.entity.RmqMsgRecord;
import org.springblade.prevention.industry.rmq.protobuf.entity.CommonOutboundVO;
import org.springblade.prevention.industry.rmq.protobuf.entity.EnterpriseRiskDistributionMapInboundVO;
import org.springblade.prevention.industry.rmq.rocketmq.dto.CheckMsgDTO;
import org.springblade.prevention.industry.rmq.rocketmq.dto.OutboundCodeEnum;
import org.springblade.prevention.industry.rmq.rocketmq.util.CommonCheckUtil;
import org.springblade.prevention.industry.rmq.wrapper.BeanCommonPropertiesSetWrapper;
import org.springblade.system.enums.DictEnum;
import org.springblade.system.enums.YesNoEnum;
import org.springblade.system.feign.IDictClient;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**
* EnterpriseRiskDistributionMapConsumer 企业 风险分布图 消费者
*
*
* 1.消息接收到, 优先放入到redis2.
*
* @author zdd
* @date 2024/3/1 15:05
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consume_group_pc_industry_2",
   topic = RocketMqConstant.TOPIC_ENTERPRISE_RISK_DISTRIBUTION_MAP_INBOUND)
public class EnterpriseRiskDistributionMapConsumer implements RocketMQListener {

   @Resource
   private RedisTemplate, Object> redisTemplate; // redisTemplate

   @Resource
   private SimpleSyncMessageProducer simpleSyncMessageProducer;

   @Resource
   private IDictClient dictClient;

   @Override
   public void onMessage(MessageExt messageExt) {
       // MessageExt对象中获取tag
       String tag = messageExt.getTags();
       String uniqueKey = null;
       try {
           String msgId = messageExt.getMsgId();
           // MessageExt转换为您的业务对象
           EnterpriseRiskDistributionMapInboundVO.EnterpriseRiskDistributionMapInbound inbound =
               EnterpriseRiskDistributionMapInboundVO.EnterpriseRiskDistributionMapInbound.parseFrom(
                   messageExt.getBody());
           uniqueKey = inbound.getUniqueKey();
           String jsonString = JsonFormat.printer().print(inbound);
           log.info(
               RocketMqConstant.TOPIC_ENTERPRISE_RISK_DISTRIBUTION_MAP_INBOUND + " Receive Message with Tag:{}, msgId:{} Content:{}",
               tag, msgId, jsonString);
           // 校验 数据
           CommonOutboundVO.CommonOutbound commonOutbound = checkParam(inbound);
           RmqMsgRecord rmqMsgRecord;
           if (commonOutbound.getCode() == OutboundCodeEnum.SUCCESS.getCode()) {
               rmqMsgRecord = convertToRmqMsgRecord(msgId, YesNoEnum.YES.getKey(), jsonString, inbound);
               // 消息转换成 对应的表实体
               EnterpriseRiskDistributeMap entity = convertToEntity(inbound);
               // 消息存放到redis
               redisTemplate.opsForList()
                   .leftPush(RedisKeyConstant.REDIS_KEY_RMQ_ENTERPRISE_RISK_DISTRIBUTION_MAP_INBOUND, entity);
           } else {
               rmqMsgRecord = convertToRmqMsgRecord(msgId, YesNoEnum.NO.getKey(), jsonString, inbound);
           }
           // 消息存放到redis
           redisTemplate.opsForList().leftPush(RedisKeyConstant.REDIS_KEY_RMQ_RECORD, rmqMsgRecord);

           // 发送出站消息
           simpleSyncMessageProducer.sendCommonOutMsg(RocketMqConstant.TOPIC_ENTERPRISE_RISK_DISTRIBUTION_MAP_OUTBOUND,
               tag, commonOutbound);
       } catch (InvalidProtocolBufferException e) {
           log.error("消息解析失败", e);
           simpleSyncMessageProducer.sendCommonOutDataFormatErr(
               RocketMqConstant.TOPIC_ENTERPRISE_RISK_DISTRIBUTION_MAP_OUTBOUND, tag, uniqueKey);
       }
   }

   private CommonOutboundVO.CommonOutbound checkParam(
       EnterpriseRiskDistributionMapInboundVO.EnterpriseRiskDistributionMapInbound inbound) {
       CommonOutboundVO.CommonOutbound outBound =
           CommonOutboundVO.CommonOutbound.newBuilder().setUniqueKey(inbound.getUniqueKey())
               .setResponseTime(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"))
               .setCode(CheckMsgDTO.success().getCode()).setMessage(CheckMsgDTO.success().getMessage()).build();

       R value = dictClient.getValue(DictEnum.DATA_OPERATE_TYPE.getName(), inbound.getDataOperate());

       // 优先校验 是否存在企业成果任务
       if (Func.isEmpty(inbound.getPlanYear())) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("缺少计划年份");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       } else if (Func.isEmpty(inbound.getUnifiedSocialCreditCode())) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("缺少企业社会统一信用代码");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       }
       CommonOutboundVO.CommonOutbound commonOutbound =
           CommonCheckUtil.checkTaskInfo(outBound, inbound.getPlanYear(), inbound.getUnifiedSocialCreditCode());
       if (commonOutbound.getCode() == OutboundCodeEnum.NOT_FOUND.getCode()) {
           return commonOutbound;
       }
       if (Func.isEmpty(inbound.getUniqueKey())) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("uniqueKey 不可为空");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       } else if (Func.isEmpty(inbound.getReportTime())) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("reportTime 不可为空");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       } else if (Func.isEmpty(inbound.getCityIndustryUnit())) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("cityIndustryUnit 不可为空");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       } else if (Func.isEmpty(inbound.getDistrictRegulatoryDept())) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("districtRegulatoryDept 不可为空");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       } else if (Func.isEmpty(inbound.getDistrictAnWeiDept())) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("districtAnWeiDept 不可为空");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       } else if (Func.isEmpty(inbound.getDataOperate())) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("dataOperate 不可为空");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       } else if (Func.isEmpty(inbound.getRiskPointType())) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("riskPointType 不可为空");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       } else if (Func.isEmpty(inbound.getImagesUrl())) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("imagesUrl 不可为空");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       }

       // 校验值是否正确
       if (Func.isEmpty(DeptCodeMapCache.getByDeptCode(inbound.getCityIndustryUnit()))) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("cityIndustryUnit 值不正确");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       } else if (Func.isEmpty(DeptCodeMapCache.getByDeptCode(inbound.getDistrictRegulatoryDept()))) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("districtRegulatoryDept 值不正确");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       } else if (Func.isEmpty(DeptCodeMapCache.getByDeptCode(inbound.getDistrictAnWeiDept()))) {
           CheckMsgDTO msgDTO = CheckMsgDTO.paramError("districtAnWeiDept 值不正确");
           outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
           return outBound;
       }

       // 开始校验字段的值,是否正确 获取字典总是获取不到, 提示获取数据失败
       //        if (Func.isEmpty(DictCache.getValue(DictEnum.DATA_OPERATE_TYPE, inbound.getDataOperate()))) {
       //            CheckMsgDTO msgDTO = CheckMsgDTO.paramError("dataOperate 值不正确");
       //            outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
       //            return outBound;
       //        } else if (Func.isEmpty(DictCache.getValue(DictEnum.RISK_POINT_TYPE, inbound.getRiskPointType()))) {
       //            CheckMsgDTO msgDTO = CheckMsgDTO.paramError("riskPointType 值不正确");
       //            outBound = outBound.toBuilder().setCode(msgDTO.getCode()).setMessage(msgDTO.getMessage()).build();
       //            return outBound;
       //        }
       return outBound;
   }

   private RmqMsgRecord convertToRmqMsgRecord(String msgId, String hasTrue, String jsonString,
       EnterpriseRiskDistributionMapInboundVO.EnterpriseRiskDistributionMapInbound inbound) {
       RmqMsgRecord copy = BeanUtil.copy(inbound, RmqMsgRecord.class);
       long id = IdWorker.getId(copy);
       BeanCommonPropertiesSetWrapper.build().setEnterpriseCommonProperties(copy, id);
       copy.setRmqTopicName(RocketMqConstant.TOPIC_ENTERPRISE_RISK_DISTRIBUTION_MAP_INBOUND);
       copy.setRmqMsgKey(msgId);
       copy.setHasTrue(hasTrue);
       copy.setReportTime(DateUtil.parse(inbound.getReportTime(), "yyyy-MM-dd HH:mm:ss"));
       copy.setOrgData(JSONUtil.parse(jsonString));
       return copy;
   }

   private EnterpriseRiskDistributeMap convertToEntity(
       EnterpriseRiskDistributionMapInboundVO.EnterpriseRiskDistributionMapInbound inbound) {
       EnterpriseRiskDistributeMap copy = BeanUtil.copy(inbound, EnterpriseRiskDistributeMap.class);
       long id = IdWorker.getId(copy);
       BeanCommonPropertiesSetWrapper.build().setEnterpriseCommonProperties(copy, id);
       copy.setReportTime(DateUtil.parse(inbound.getReportTime(), "yyyy-MM-dd HH:mm:ss"));
       return copy;
   }

}


2条回答
  •  zhx1994
    zhx1994 (楼主)
    2024-03-06 10:29

    报错日志的异常堆栈能不能提供下。

    作者追问:2024-03-06 10:29

    错误已经找到, 感谢回复

    1.错误原因, feignCofig配置NPE

    FeignConfig 是自己加的;  之前项目遇见header feign调用没有透传header,然后自己增加的配置
    之前配置未做null 逻辑判定


    image.png


    controller调用

    servletRequestAttributes是可以获取到值的

    RocketConsumer 调用 feign时  servletRequestAttributes 获取的null 触发NPE问题


    2.feign调用直接进入fallback回调;  无异常堆栈信息;  请参考此文章让控制台输出异常堆栈信息
    feign调用接口成功后直接进入fallback回调方法排查_feign接口的回调接口-CSDN博客

    3. 修改自己feign, 让输出一异常堆栈
    image.png

    image.png




提交回复