1.背景
随着业务的发展,服务进行拆分,由原先的一个kafka根据业务线拆分成了多个kafka,现在底层又不支持多kafka数据源,只能自己对kafka进行二次封装
2.kafka配置说明
下面配置了两套kafka,
主kafka为 primary: kafka_db_data1和第二个数据源kafka_db_data2,以后有第三个kafka可以按照同样的方式进行配置。
kafkas:kafka:enabled: trueprimary: kafka_db_data1producer:enabled: trueconsumer:enabled: truesource:kafka_db_data1:bootstrap-servers: 172.16.**.**:9092,172.16.**.**:9092,172.16.**.**:9092producer:enabled: trueretries: 0acks: 1batch-size: 16384properties:linger: 50buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:enabled: truegroup-id: test_groupmax-poll-records: 100fetch-max-wait: 100fetch-min-size: 100enable-auto-commit: trueheartbeat-interval: 1000client-id: ""auto-offset-reset: latestauto-commit-interval: 1000workNum: 1pollTime: 100kafka_db_data2:bootstrap-servers: 172.16.**.**:9092,172.16.**.**:9092,172.16.**.**:9092producer:enabled: trueretries: 0acks: 1batch-size: 16384properties:linger: 50buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:enabled: truegroup-id: test_groupmax-poll-records: 100fetch-max-wait: 100fetch-min-size: 100enable-auto-commit: trueheartbeat-interval: 1000client-id: ""auto-offset-reset: latestauto-commit-interval: 1000workNum: 1pollTime: 100
3.加载kafka源
kafka配置加载
3.1KafkaProperties
@ConditionalOnProperty(prefix = "kafkas.kafka",name = "enabled")
@ConfigurationProperties(prefix = "kafkas.kafka")
public class KafkaProperties {private String primary;private Map<String, KafkaDetail> source;}
KafkaDetail
public class KafkaDetail {private String bootstrapServers;private Producer producer;private Consumer consumer;private Listener listener;}
Producer
public class Producer {private int retries;private int batchSize;private int bufferMemory;private String acks;private Properties properties;private Boolean enabled;}
Consumer
public class Consumer {private String groupId;private Integer maxPollRecords = 100;private String fetchMaxWait;private String fetchMinSize;private boolean enableAutoCommit = true;private Integer heartbeatInterval = 3000;private String clientId;private String autoOffsetRest = "latest";private Integer autoCommitInterval = 5000;private Integer workerNum = 1;private Boolean enabled;private Integer pollTime = 100;private Integer retries = 0;}
3.2KafkaAutoConfiguration
加载配置类,此类初始化了多个producer,将多个producer初始化好后,放入Map,交由spring进行管理
package com.rongss.boot.middleware.kafka;import com.google.common.collect.Maps;
import com.rongss.boot.middleware.kafka.producer.KafkaTemplate;
import com.rongss.boot.middleware.kafka.producer.impl.KafkaTemplateImpl;
import com.rongss.boot.middleware.kafka.properties.KafkaDetail;
import com.rongss.boot.middleware.kafka.properties.KafkaProperties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;@Configuration
@ConditionalOnProperty(prefix = "kafkas.kafka",name = "enabled")
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaAutoConfiguration {private KafkaProducer<String, String> kafkaProducer;@Autowiredprivate KafkaProperties kafkaProperties;public Properties producerConfigs(KafkaDetail kafkaDetail) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaDetail.getBootstrapServers());props.put(ProducerConfig.RETRIES_CONFIG, kafkaDetail.getProducer().getRetries());props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaDetail.getProducer().getBatchSize());props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaDetail.getProducer().getProperties().getLinger());props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaDetail.getProducer().getBufferMemory());props.put(ProducerConfig.ACKS_CONFIG, kafkaDetail.getProducer().getAcks());props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public KafkaProducer<String, String> kafkaProducer(KafkaDetail kafkaDetail) {kafkaProducer = new KafkaProducer<>(producerConfigs(kafkaDetail));return kafkaProducer;}@Bean@ConditionalOnProperty(prefix = "kafkas.kafka.producer",name = "enabled")public Map<String,KafkaTemplate> kafkaTemplate(){Map<String,KafkaDetail> sources = kafkaProperties.getSource();Map<String,KafkaTemplate> templateMap = new HashMap<>();for (String key : sources.keySet()){KafkaDetail kafkaDetail = sources.get(key);if(kafkaDetail.getProducer() != null && kafkaDetail.getProducer().getEnabled()){KafkaTemplate template = new KafkaTemplateImpl(kafkaProducer(kafkaDetail));templateMap.put(key,template);}}return templateMap;}public Map<String, Object> consumerConfigs(KafkaDetail kafkaDetail) {Map<String, Object> props = Maps.newHashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaDetail.getBootstrapServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaDetail.getConsumer().getGroupId());props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaDetail.getConsumer().getMaxPollRecords());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaDetail.getConsumer().isEnableAutoCommit());props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, kafkaDetail.getConsumer().getFetchMaxWait());props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaDetail.getConsumer().getHeartbeatInterval());props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaDetail.getConsumer().getClientId());props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaDetail.getConsumer().getAutoCommitInterval());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaDetail.getConsumer().getAutoOffsetRest());return props;}}
producer发消息
消息实体
public class MessageEvent implements Serializable {private static final long serialVersionUID = -2624253925403159396L;/*** 事件序列ID*/private String txId;/*** 话题的名字*/private String topic;/*** 话题的名字*/private String tag;/*** 需要传递的领域对象*/private Object body;/*** 传递的领域对象的唯一标识,用来构建消息的唯一标识,不检测重复,可以为空,不影响消息收发*/private String bodyKey;/*** 事件创建时间*/private long createdDate = System.currentTimeMillis();/*** 消息类型*/private MessageType messageType;/*** 方便的生成TxId的方法** @return*/public String generateTxId() {if (null == txId) {txId = getTopic() + ":" + getTag() + ":";if (StringUtil.isNullOrEmpty(bodyKey)) {txId = txId + getCreatedDate() + ":" + UUID.randomUUID().toString();} else {txId = txId + bodyKey;}}return txId;}}
producer接口类
public interface KafkaTemplate {Future<RecordMetadata> send(MessageEvent event);Future<RecordMetadata> send(MessageEvent event, Callback callback);
}
实现
public class KafkaTemplateImpl implements KafkaTemplate {private Logger log = LoggerFactory.getLogger(KafkaTemplateImpl.class);private KafkaProducer kafkaProducer;public KafkaTemplateImpl(KafkaProducer kafkaProducer){this.kafkaProducer = kafkaProducer;}private ProducerRecord<String, Object> getRecord(MessageEvent event){if(event == null) {throw new RuntimeException("event is null.");}if (StringUtils.isEmpty(event.getTopic()) || null == event.getBody()) {throw new RuntimeException("topic, or body is null.");}return new ProducerRecord<>(event.getTopic(), event.getBody());}@Overridepublic Future<RecordMetadata> send(MessageEvent event) {return kafkaProducer.send(getRecord(event));}@Overridepublic Future<RecordMetadata> send(MessageEvent event, Callback callback) {return kafkaProducer.send(getRecord(event),callback);}}
多实例多消费者
首先在这里准备了几个自定义注解
@ListenerHandler //写在类上,让spring扫描到,表示该类使用了kafka的消费者
@ListenerExecute //写在具体方法上,监听器会将相应的topic数据推送至该方法
ListenerExecute注解
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ListenerExecute {String message() default "";String topic() default "";String tags() default "";int retry() default 0;MessageType action();}
KafkaListenerProcessor
主要是将ListenerExecute注解 和 KafkaListener进行关联
@Configuration
public class ListenerProcessor implements BeanPostProcessor {@Autowiredprivate ApplicationContext applicationContext;@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {return bean;}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class<?> clazz = AopUtils.getTargetClass(bean);ListenerExecute annotation = clazz.getAnnotation(ListenerExecute.class);if (null != annotation) {final Method[] methods = bean.getClass().getMethods();for (Method method : methods) {final ListenerExecute methodAnnotation = method.getAnnotation(ListenerExecute.class);if (methodAnnotation != null) {final MessageType action = methodAnnotation.action();addListener(methodAnnotation.source(), bean, method, action, methodAnnotation.topic(), methodAnnotation.tags(),methodAnnotation.reMsg());}}}return bean;}private void addListener(String source, Object bean, Method method, MessageType action, String topic, String tags,String reMsg){KafkaListener kafkaListener = applicationContext.getBean(KafkaListener.class);if(null == kafkaListener){throw new NullPointerException("KafkaListener is null ! please check kafka.consumer.enabled = true");}kafkaListener.addListener(source, bean, method, topic, tags, reMsg);}
}
具体监听类
此类初始化时启动,扫描项目中注解@ListenerHandler 和 @ListenerExecute,监听相应的方法,并把数据推送给消费的具体方法
@Component
@ConditionalOnProperty(prefix = "kafkas.kafka.consumer",value = "enabled")
public class KafkaListener implements Listener {@Autowiredprivate Environment env;@Autowiredprivate KafkaAutoConfiguration kafkaAutoConfiguration;@Autowiredprivate KafkaProperties kafkaProperties;@Overridepublic void addListener(String source, Object bean,Method method, String topicKey, String tags, String reMsg) {Map<String, KafkaDetail> kafkaConfings = kafkaProperties.getSource();KafkaDetail kafkaDetail = kafkaConfings.get(source);if(kafkaDetail.getConsumer() != null && kafkaDetail.getConsumer().getEnabled()){Integer workNum = kafkaDetail.getConsumer().getWorkerNum();for (int i=0;i<workNum;i++){log.info("init kafka consumer start source:{}", source);openListener(kafkaDetail, bean, method, topicKey);}}}private void openListener(KafkaDetail kafkaDetail, Object bean,Method method, String topicKey){topicKey = topicKey.replace(" ","");if(StringUtils.isEmpty(topicKey)){throw new RuntimeException("topic is null");}final String topicValue = topicKey.substring(2,topicKey.length()-1);new Thread(){@Overridepublic void run() {KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaAutoConfiguration.consumerConfigs(kafkaDetail));String topic = env.getProperty(topicValue);consumer.subscribe(Arrays.asList(topic));Parameter[] parameters = method.getParameters();while (true){try {ConsumerRecords<String, String> records = consumer.poll(kafkaDetail.getConsumer().getPollTime());if(!records.isEmpty()){List<String> list = new ArrayList<>(records.count());for (ConsumerRecord record : records) {list.add(record.value().toString());}Object[] params = new Object[parameters.length];params[0] = list;log.info("kafka commit info fail topic={}, message={}",topic,JSON.toJSON(list));Boolean ret = invoke(bean, method,params);if(!kafkaDetail.getConsumer().isEnableAutoCommit()){if(ret){log.info("kafka commit info fail ret={}, topic={}, message={}",ret,topic,JSON.toJSON(list));consumer.commitAsync();} else if(!ret && kafkaDetail.getConsumer().getRetries().intValue() > 0){for (int i = 1;i<kafkaDetail.getConsumer().getRetries().intValue();i++) {ret = invoke(bean, method, params);if (ret) {consumer.commitAsync();break;}if(i == kafkaDetail.getConsumer().getRetries().intValue() - 1){log.info("kafka commit info retries fail topic={}, message={}",topic,JSON.toJSON(list));consumer.commitAsync();}}}}}} catch (IllegalAccessException e) {log.error("消费器不合法访问!",e);} catch (InvocationTargetException e) {log.error("消费器调用handler失败!",e);} catch (Throwable e){log.error("Throwable!Consumer error",e);}}}}.start();}public Boolean invoke(Object bean, Method method, Object[] params) throws Exception{Object obj = method.invoke(bean,params);return obj == null ? false : (Boolean)obj;}public String[] getMethodParamName(Class<?> clazz, String methodName, Class<?>[] parameterTypes) {try {Method method = clazz.getDeclaredMethod(methodName, parameterTypes);LocalVariableTableParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();return parameterNameDiscoverer.getParameterNames(method);} catch (Exception e) {e.printStackTrace();}return null;}
}
使用方式
参数详解:
@Value 表示从spring管理的Map中区key值为kafka_db_data1的KafkaTemplate对象
@ListenerExecute 定义了source来源,可以匹配到kafka的数据源,topic可配置
@Slf4j
@Component
@ListenerHandler
public class KafkaDemo {@Value("#{kafkaTemplate.kafka_db_data1}")private KafkaTemplate kafkaDbData1;public void kafkaProducerDemo(){MessageEvent event = new MessageEvent();event.setBody("我是具体数据");event.setTopic("demo_topic");kafkaDbData1.send(event);}@ListenerExecute(source = "kafka_db_data1", topic = "${kafkas.kafka.source.kafka_db_data.topic.comments}", action = MessageType.KAFKA, reMsg = "data")public void kafkaDemo(List<String> data) {log.info(JSON.toJSONString(data));}
}