当前位置: 代码迷 >> 综合 >> Springboot RocketMQ Starter Integration
  详细解决方案

Springboot RocketMQ Starter Integration

热度:55   发布时间:2023-10-13 15:14:03.0

文章目录

  • 一 Springboot2.0.6集成Rocketmq Starter
    • 1 引入依赖
    • 2 yml配置
    • 3 生产者配置
    • 4 消费者配置
    • 5 消费者重写
  • 二 Rocketmq Stater分析

一 Springboot2.0.6集成Rocketmq Starter

1 引入依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></exclusion><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion></exclusions><version>2.1.0</version>
</dependency>

2 yml配置

rocketmq:name-server: 192.168.0.213:9876producer:group: ${
    spring.application.name}sendMessageTimeout: 3000compressMessageBodyThreshold: 4096retryTimesWhenSendFailed: 0retryTimesWhenSendAsyncFailed: 0retryNextServer: falsemaxMessageSize: 4194304

3 生产者配置

    @Bean@ConditionalOnMissingBean({
    DefaultMQProducer.class})@ConditionalOnProperty(prefix = "rocketmq",value = {
    "name-server", "producer.group"})public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
    System.out.println(rocketMQProperties.getNameServer());RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();String nameServer = rocketMQProperties.getNameServer();String groupName = IataMqConstant.importTypeBProducer;Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");String accessChannel = rocketMQProperties.getAccessChannel();String ak = rocketMQProperties.getProducer().getAccessKey();String sk = rocketMQProperties.getProducer().getSecretKey();boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);producer.setNamesrvAddr(nameServer);if (!StringUtils.isEmpty(accessChannel)) {
    producer.setAccessChannel(AccessChannel.valueOf(accessChannel));}producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());producer.setMaxMessageSize(producerConfig.getMaxMessageSize());producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());log.info("Rocketmq Producer Started !!!");return producer;}

4 消费者配置

/*** Rocketmq 消费者* 这里不能使用@Configuration注解** @author pengjunjie* @date 2022/2/15 17:41*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.import.typeBQueue}",consumerGroup = "${rocketmq.import.typeBConsumer}",enableMsgTrace = false
)
@RequiredArgsConstructor
public class RocketMQConfig implements RocketMQListener<String> {
    private final IataMessageStorageService iataMessageStorageService;private final TypeBRecordDao typeBRecordDao;/*** 有个maxReconsumeTimes的参数设置retry消费次数* 抛出异常,上层捕获返回 ReconsumerLater, 重复消费maxReconsumeTimes后 进入%DLQ%ConsumerGroup队列** @param typeBText*/@Overridepublic void onMessage(String typeBText) {
    JSONObject jsonObject = JSONObject.parseObject(typeBText);Long typeBId = jsonObject.getLong("typeBId");String typeBContent = jsonObject.getString("typeBContent");// 重复消费Long existTypeBId = typeBRecordDao.selectByTypeBId(typeBId);if (existTypeBId != null) {
    return;}iataMessageStorageService.StorageImportMessage(typeBId, typeBContent);}
}

5 消费者重写

/*** 重写rocketmq消费者属性** @author pengjunjie* @date 2022/2/17 11:24*/
@Component
public class RocketMQBeanPostProcessor implements BeanPostProcessor {
    @Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    if (beanName.contains("DefaultRocketMQListenerContainer")) {
    DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;container.getConsumer().setMaxReconsumeTimes(3);}return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);}
}

二 Rocketmq Stater分析

核心类 RocketMQAutoConfiguration,Springboot自动装配原理[META-INFO/spring.factories]

@Configuration
@EnableConfigurationProperties({
    RocketMQProperties.class})
@ConditionalOnClass({
    MQAdmin.class})
@ConditionalOnProperty(prefix = "rocketmq",value = {
    "name-server"},matchIfMissing = true
)
// ListenerContainerConfiguration 消费者启动、注入监听器
@Import({
    MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})
@AutoConfigureAfter({
    MessageConverterConfiguration.class})
@AutoConfigureBefore({
    RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration {
    private static final Logger log = LoggerFactory.getLogger(RocketMQAutoConfiguration.class);public static final String ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME = "rocketMQTemplate";@Autowiredprivate Environment environment;public RocketMQAutoConfiguration() {
    }@PostConstructpublic void checkProperties() {
    String nameServer = (String)this.environment.getProperty("rocketmq.name-server", String.class);log.debug("rocketmq.nameServer = {}", nameServer);if (nameServer == null) {
    log.warn("The necessary spring property 'rocketmq.name-server' is not defined, all rockertmq beans creation are skipped!");}}// 配置生产者@Bean@ConditionalOnMissingBean({
    DefaultMQProducer.class})@ConditionalOnProperty(prefix = "rocketmq",value = {
    "name-server", "producer.group"})public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
    Producer producerConfig = rocketMQProperties.getProducer();String nameServer = rocketMQProperties.getNameServer();String groupName = producerConfig.getGroup();Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");String accessChannel = rocketMQProperties.getAccessChannel();String ak = rocketMQProperties.getProducer().getAccessKey();String sk = rocketMQProperties.getProducer().getSecretKey();boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);producer.setNamesrvAddr(nameServer);if (!StringUtils.isEmpty(accessChannel)) {
    producer.setAccessChannel(AccessChannel.valueOf(accessChannel));}producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());producer.setMaxMessageSize(producerConfig.getMaxMessageSize());producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());return producer;}@Bean(destroyMethod = "destroy")@ConditionalOnBean({
    DefaultMQProducer.class})@ConditionalOnMissingBean(name = {
    "rocketMQTemplate"})public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, RocketMQMessageConverter rocketMQMessageConverter) {
    RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();rocketMQTemplate.setProducer(mqProducer);rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());return rocketMQTemplate;}
}

ListenerContainerConfiguration

// 消费者配置,默认使用DefaultPushConsumer
@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
    private static final Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);private ConfigurableApplicationContext applicationContext;private AtomicLong counter = new AtomicLong(0L);private StandardEnvironment environment;private RocketMQProperties rocketMQProperties;private RocketMQMessageConverter rocketMQMessageConverter;public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter, StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
    this.rocketMQMessageConverter = rocketMQMessageConverter;this.environment = environment;this.rocketMQProperties = rocketMQProperties;}public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = (ConfigurableApplicationContext)applicationContext;}public void afterSingletonsInstantiated() {
    // 获取加了RocketMQMessageListener注解的Bean列表Map<String, Object> beans = (Map)this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter((entry) -> {
    return !ScopedProxyUtils.isScopedTarget((String)entry.getKey());}).collect(Collectors.toMap(Entry::getKey, Entry::getValue));// 对这些Bean启动消费者和监听beans.forEach(this::registerContainer);}private void registerContainer(String beanName, Object bean) {
    Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
    throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());} else if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
    throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());} else {
    // 获取RocketMQMessageListener注解,获取注解中的消费者组、topic等信息RocketMQMessageListener annotation = (RocketMQMessageListener)clazz.getAnnotation(RocketMQMessageListener.class);String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());String topic = this.environment.resolvePlaceholders(annotation.topic());boolean listenerEnabled = (Boolean)((Map)this.rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)).getOrDefault(topic, true);if (!listenerEnabled) {
    log.debug("Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic);} else {
    this.validate(annotation);String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), this.counter.incrementAndGet());GenericApplicationContext genericApplicationContext = (GenericApplicationContext)this.applicationContext;genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> {
    return this.createRocketMQListenerContainer(containerBeanName, bean, annotation);}, new BeanDefinitionCustomizer[0]);DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer)genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class);if (!container.isRunning()) {
    try {
    // 容器启动,defaultPushConsumer#startcontainer.start();} catch (Exception var12) {
    log.error("Started container failed. {}", container, var12);throw new RuntimeException(var12);}}log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);}}}// 创建DefaultRocketMQListenerContainer// 内部类 DefaultMessageListenerConcurrently或者DefaultMessageListenerOrderly会被执行private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {
    DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();container.setRocketMQMessageListener(annotation);String nameServer = this.environment.resolvePlaceholders(annotation.nameServer());nameServer = StringUtils.isEmpty(nameServer) ? this.rocketMQProperties.getNameServer() : nameServer;String accessChannel = this.environment.resolvePlaceholders(annotation.accessChannel());container.setNameServer(nameServer);if (!StringUtils.isEmpty(accessChannel)) {
    container.setAccessChannel(AccessChannel.valueOf(accessChannel));}container.setTopic(this.environment.resolvePlaceholders(annotation.topic()));String tags = this.environment.resolvePlaceholders(annotation.selectorExpression());if (!StringUtils.isEmpty(tags)) {
    container.setSelectorExpression(tags);}container.setConsumerGroup(this.environment.resolvePlaceholders(annotation.consumerGroup()));// 这个Bean是我们加了RocketMQListener注解的Bean// DefaultMessageListenerConcurrently或者DefaultMessageListenerOrderly执行其实调用bean的onMessage方法// 不过所有的状态处理在 DefaultMessageListenerConcurrently或者DefaultMessageListenerOrderly中if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
    container.setRocketMQListener((RocketMQListener)bean);} else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
    container.setRocketMQReplyListener((RocketMQReplyListener)bean);}container.setMessageConverter(this.rocketMQMessageConverter.getMessageConverter());container.setName(name);return container;}// ......
}
  相关解决方案